zoukankan      html  css  js  c++  java
  • 第十四章:实现自定义的编码解码器


    本文转自互联网博客,在此感谢博主的总结归纳,本文只提供于巩固复习使用

    本章讲述Netty中如何轻松实现定制的编解码器,由于Netty架构的灵活性,这些编解码器易于重用和测试。为了更容易实现,使用Memcached作为协议例子是因为它更方便我们实现。

    Memcached是免费开源、高性能、分布式的内存对象缓存系统,其目的是加速动态Web应用程序的响应,减轻数据库负载;Memcache实际上是一个以key-value存储任意数据的内存小块。可能有人会问“为什么使用Memcached?因为Memcached协议非常简单,便于讲解。

    14.1 编解码器的范围

    ​ 我们将只实现Memcached协议的一个子集,这足够我们进行添加、检索、删除对象;在Memcached中是通过执行SET,GET,DELETE命令来实现的。Memcached支持很多其他的命令,但我们只使用其中三个命令,简单的东西,我们才会理解的更清楚。

    ​ Memcached有一个二进制和纯文本协议,它们都可以用来与Memcached服务器通信,使用什么类型的协议取决于服务器支持哪些协议。本章主要关注实现二进制协议,因为二进制在网络编程中最常用。

    14.2 实现Memcached的编解码器

    ​ 当想要实现一个给定协议的编解码器,我们应该花一些事件来了解它的运作原理。通常情况下,协议本身都有一些详细的记录。在这里你会发现多少细节?幸运的是Memcached的二进制协议可以很好的扩展。

    ​ 在RFC中有相应的规范,并提供了Memcached二进制协议下载地址:http://code.google.com/p/memcached/wiki/BinaryProtocolRevamped。我们不会执行Memcached的所有命令,只会执行三种操作:SET,GET和DELETE。这样做事为了让事情变得简单。

    ###14.3 了解Memcached二进制协议

    ​ 可以在http://code.google.com/p/memcached/wiki/BinaryProtocolRevamped上详细了解Memcached二进制协议结构。不过这个网站如果不FQ的话好像访问不了。

    14.4 Netty编码器和解码器

    14.4.1 实现Memcached编码器

    先定义memcached操作码(Opcode)和响应状态码(Status):

    package netty.in.action.mem;
     
    /**
     * memcached operation codes
     * @author c.king
     *
     */
    public class Opcode {
    	
    	public static final byte GET = 0x00;
    	public static final byte SET = 0x01;
    	public static final byte DELETE = 0x04;
    }
    
    package netty.in.action.mem;
     
    /**
     * memcached response statuses
     * @author c.king
     *
     */
    public class Status {
    	
    	public static final short NO_ERROR = 0x0000;
    	public static final short KEY_NOT_FOUND = 0x0001;
    	public static final short KEY_EXISTS = 0x0002;
    	public static final short VALUE_TOO_LARGE = 0x0003;
    	public static final short INVALID_ARGUMENTS = 0x0004;
    	public static final short ITEM_NOT_STORED = 0x0005;
    	public static final short INC_DEC_NON_NUM_VAL = 0x0006;
    }
    

    继续编写memcached请求消息体:

    package netty.in.action.mem;
     
    import java.util.Random;
     
    /**
     * memcached request message object
     * @author c.king
     *
     */
    public class MemcachedRequest {
     
    	private static final Random rand = new Random();
    	private int magic = 0x80;// fixed so hard coded
    	private byte opCode; // the operation e.g. set or get
    	private String key; // the key to delete, get or set
    	private int flags = 0xdeadbeef; // random
    	private int expires; // 0 = item never expires
    	private String body; // if opCode is set, the value
    	private int id = rand.nextInt(); // Opaque
    	private long cas; // data version check...not used
    	private boolean hasExtras; // not all ops have extras
     
    	public MemcachedRequest(byte opcode, String key, String value) {
    		this.opCode = opcode;
    		this.key = key;
    		this.body = value == null ? "" : value;
    		// only set command has extras in our example
    		hasExtras = opcode == Opcode.SET;
    	}
     
    	public MemcachedRequest(byte opCode, String key) {
    		this(opCode, key, null);
    	}
     
    	public int getMagic() {
    		return magic;
    	}
     
    	public byte getOpCode() {
    		return opCode;
    	}
     
    	public String getKey() {
    		return key;
    	}
     
    	public int getFlags() {
    		return flags;
    	}
     
    	public int getExpires() {
    		return expires;
    	}
     
    	public String getBody() {
    		return body;
    	}
     
    	public int getId() {
    		return id;
    	}
     
    	public long getCas() {
    		return cas;
    	}
     
    	public boolean isHasExtras() {
    		return hasExtras;
    	}
    }
    

    最后编写memcached请求编码器:

    package netty.in.action.mem;
     
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    import io.netty.util.CharsetUtil;
     
    /**
     * memcached request encoder
     * @author c.king
     *
     */
    public class MemcachedRequestEncoder extends MessageToByteEncoder<MemcachedRequest> {
     
    	@Override
    	protected void encode(ChannelHandlerContext ctx, MemcachedRequest msg, ByteBuf out)
    			throws Exception {
    		// convert key and body to bytes array
    		byte[] key = msg.getKey().getBytes(CharsetUtil.UTF_8);
    		byte[] body = msg.getBody().getBytes(CharsetUtil.UTF_8);
    		// total size of body = key size + body size + extras size
    		int bodySize = key.length + body.length + (msg.isHasExtras() ? 8 : 0);
    		// write magic int
    		out.writeInt(msg.getMagic());
    		// write opcode byte
    		out.writeByte(msg.getOpCode());
    		// write key length (2 byte) i.e a Java short
    		out.writeShort(key.length);
    		// write extras length (1 byte)
    		int extraSize = msg.isHasExtras() ? 0x08 : 0x0;
    		out.writeByte(extraSize);
    		// byte is the data type, not currently implemented in Memcached
    		// but required
    		out.writeByte(0);
    		// next two bytes are reserved, not currently implemented
    		// but are required
    		out.writeShort(0);
    		// write total body length ( 4 bytes - 32 bit int)
    		out.writeInt(bodySize);
    		// write opaque ( 4 bytes) - a 32 bit int that is returned
    		// in the response
    		out.writeInt(msg.getId());
    		// write CAS ( 8 bytes)
    		// 24 byte header finishes with the CAS
    		out.writeLong(msg.getCas());
    		if(msg.isHasExtras()){
    			// write extras
    			// (flags and expiry, 4 bytes each), 8 bytes total
    			out.writeInt(msg.getFlags());
    			out.writeInt(msg.getExpires());
    		}
    		//write key
    		out.writeBytes(key);
    		//write value
    		out.writeBytes(body);
    	}
    }
    

    14.4.2 实现Memcached解码器

    编写memcached响应消息体:

    package netty.in.action.mem;
     
    /**
     * memcached response message object
     * @author c.king
     *
     */
    public class MemcachedResponse {
     
    	private byte magic;
    	private byte opCode;
    	private byte dataType;
    	private short status;
    	private int id;
    	private long cas;
    	private int flags;
    	private int expires;
    	private String key;
    	private String data;
     
    	public MemcachedResponse(byte magic, byte opCode, byte dataType, short status,
    			int id, long cas, int flags, int expires, String key, String data) {
    		this.magic = magic;
    		this.opCode = opCode;
    		this.dataType = dataType;
    		this.status = status;
    		this.id = id;
    		this.cas = cas;
    		this.flags = flags;
    		this.expires = expires;
    		this.key = key;
    		this.data = data;
    	}
     
    	public byte getMagic() {
    		return magic;
    	}
     
    	public byte getOpCode() {
    		return opCode;
    	}
     
    	public byte getDataType() {
    		return dataType;
    	}
     
    	public short getStatus() {
    		return status;
    	}
     
    	public int getId() {
    		return id;
    	}
     
    	public long getCas() {
    		return cas;
    	}
     
    	public int getFlags() {
    		return flags;
    	}
     
    	public int getExpires() {
    		return expires;
    	}
     
    	public String getKey() {
    		return key;
    	}
     
    	public String getData() {
    		return data;
    	}
    }
    

    编写memcached响应解码器:

    package netty.in.action.mem;
     
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    import io.netty.util.CharsetUtil;
     
    import java.util.List;
     
    public class MemcachedResponseDecoder extends ByteToMessageDecoder {
     
    	private enum State {
    		Header, Body
    	}
     
    	private State state = State.Header;
    	private int totalBodySize;
    	private byte magic;
    	private byte opCode;
    	private short keyLength;
    	private byte extraLength;
    	private byte dataType;
    	private short status;
    	private int id;
    	private long cas;
     
    	@Override
    	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
    			throws Exception {
    		switch (state) {
    		case Header:
    			// response header is 24 bytes
    			if (in.readableBytes() < 24) {
    				return;
    			}
    			// read header
    			magic = in.readByte();
    			opCode = in.readByte();
    			keyLength = in.readShort();
    			extraLength = in.readByte();
    			dataType = in.readByte();
    			status = in.readShort();
    			totalBodySize = in.readInt();
    			id = in.readInt();
    			cas = in.readLong();
    			state = State.Body;
    			break;
    		case Body:
    			if (in.readableBytes() < totalBodySize) {
    				return;
    			}
    			int flags = 0;
    			int expires = 0;
    			int actualBodySize = totalBodySize;
    			if (extraLength > 0) {
    				flags = in.readInt();
    				actualBodySize -= 4;
    			}
    			if (extraLength > 4) {
    				expires = in.readInt();
    				actualBodySize -= 4;
    			}
    			String key = "";
    			if (keyLength > 0) {
    				ByteBuf keyBytes = in.readBytes(keyLength);
    				key = keyBytes.toString(CharsetUtil.UTF_8);
    				actualBodySize -= keyLength;
    			}
    			ByteBuf body = in.readBytes(actualBodySize);
    			String data = body.toString(CharsetUtil.UTF_8);
    			out.add(new MemcachedResponse(magic, opCode, dataType, status,
    					id, cas, flags, expires, key, data));
    			state = State.Header;
    			break;
    		default:
    			break;
    		}
    	}
    }
    

    14.5 测试编解码器

    基于netty的编解码器都写完了,下面我们来写一个测试它的类:

    package netty.in.action.mem;
     
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.embedded.EmbeddedChannel;
    import io.netty.util.CharsetUtil;
     
    import org.junit.Assert;
    import org.junit.Test;
     
    /**
     * test memcached encoder
     * @author c.king
     *
     */
    public class MemcachedRequestEncoderTest {
     
    	@Test
    	public void testMemcachedRequestEncoder() {
    		MemcachedRequest request = new MemcachedRequest(Opcode.SET, "k1", "v1");
    		EmbeddedChannel channel = new EmbeddedChannel(
    				new MemcachedRequestEncoder());
    		Assert.assertTrue(channel.writeOutbound(request));
    		ByteBuf encoded = (ByteBuf) channel.readOutbound();
    		Assert.assertNotNull(encoded);
    		Assert.assertEquals(request.getMagic(), encoded.readInt());
    		Assert.assertEquals(request.getOpCode(), encoded.readByte());
    		Assert.assertEquals(2, encoded.readShort());
    		Assert.assertEquals((byte) 0x08, encoded.readByte());
    		Assert.assertEquals((byte) 0, encoded.readByte());
    		Assert.assertEquals(0, encoded.readShort());
    		Assert.assertEquals(2 + 2 + 8, encoded.readInt());
    		Assert.assertEquals(request.getId(), encoded.readInt());
    		Assert.assertEquals(request.getCas(), encoded.readLong());
    		Assert.assertEquals(request.getFlags(), encoded.readInt());
    		Assert.assertEquals(request.getExpires(), encoded.readInt());
    		byte[] data = new byte[encoded.readableBytes()];
    		encoded.readBytes(data);
    		Assert.assertArrayEquals((request.getKey() + request.getBody())
    				.getBytes(CharsetUtil.UTF_8), data);
    		Assert.assertFalse(encoded.isReadable());
    		Assert.assertFalse(channel.finish());
    		Assert.assertNull(channel.readInbound());
    	}
    }
    

    14.6 Summary

    本章主要是使用netty写了个模拟memcached二进制协议的处理。至于memcached二进制协议具体是个啥玩意,可以单独了解,这里也没有详细说明。

  • 相关阅读:
    unix domain socket 浅析
    Python单元测试的Mock是怎么回事
    三招搞定你的ubuntu安全问题
    思考一次整体调整Python项目规范性的过程
    不可缺少的程序埋点
    python + unittest + request + parameterized 参数化遇到中文名称testcase不显示的问题
    【CDH】cdh搭建遇到的坑和解决过程
    [Linux系统]安装时出现Requires: libc.so.6(GLIBC_2.17)(64bit) Requires: systemd Requires: libstdc++.so时解决办法
    【Linux命令】在Linux服务器上与windows通过SCP命令互传文件时出现的问题排查过程
    【微信公众号】记一次微信活动微信公众号分享没有LOGO的解决心路历程
  • 原文地址:https://www.cnblogs.com/daozhangblog/p/12446381.html
Copyright © 2011-2022 走看看