一个仿Redis的内存数据库(主要用来做命令解析)服务端,
Redis 简介:Redis
是一个开源的底层使用C语言编写的key-value
内存数据库。可用于缓存数据、事件发布订阅、高速队列等场景,而且支持丰富的数据类型:string
(字符串)、hash
(哈希)、list
(列表)、set
(无序集合)、zset
(有序集合)
使用场景:
使用场景
随着数据量的增长,MySQL已经满足不了大型互联网类应用的需求。因此,Redis基于内存存储数据,在某些场景下,就会大大提高效率。
缓存:对于热点数据,缓存以后可能读取数十万次,因此对于热点数据,不但缓存的价值非常大,而且当总数据量比较大的时候直接从数据库中查询会比较影响性能。例如:对经常需要查询且变动不是很频繁的数据,可以考虑基于Redis实现缓存。
会话缓存:Redis还可以进行会话缓存。例如:将web session存放在Redis中。
计数器:因为Redis具有原子性,所以在某些方面可以避免并发问题,比如:统计点击率、点赞率、收藏率等。
消息队列:Redis能作为一个很好的消息队列来使用,依赖List类型利用LPUSH命令将数据添加到链表头部,通过BRPOP命令将元素从链表尾部取出。
社交列表:社交属性相关的列表信息,例如,用户点赞列表、用户分享列表、用户收藏列表、用户关注列表、用户粉丝列表等,使用Hash类型数据结构是个不错的选择。
最新动态:按照时间顺序排列的最新动态,也是一个很好的应用,可以使用Sorted Set类型的分数权重存储时间戳进行排序。
Redis数据类型
String(字符串)
String是Redis的最基本数据结构,以一个键和一个值存储在Redis内部,类似java的Map结构,可以通过键去找值。
Hash(哈希)
Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象,类似Java里面的Map<String, Object>。
List(列表)
根据插入顺序排序的字符串元素的集合,底层实现是链表。
Set(集合)
Redis的Set是string类型的无序集合,它是通过HashTable实现的。
zset(sorted set:有序集合)
Redis zset和set一样也是string类型元素的集合,且不允许重复。但每个字符串元素与浮点数值相关联,称为分数,元素总是按其分数排序,可以检索一系列元素。
Redis协议规范
Redis客户端使用名为RESP(REdis序列化协议)的协议与Redis服务器通信。虽然该协议是专为Redis设计的,但它可以用于其他客户端 - 服务器软件项目。
RESP可以序列化不同的数据类型,如整数,字符串,数组,还有一种特定的错误类型。请求从客户端发送到Redis服务器,作为表示要执行的命令的参数的字符串数组,Redis使用特定于命令的数据类型进行回复。
注意:此处概述的协议仅用于客户端 - 服务器通信。Redis Cluster使用不同的二进制协议,以便在节点之间交换消息。
网络层:客户端连接到Redis服务器,创建到端口6379的TCP连接。
请求 - 响应模型:Redis接受由不同参数组成的命令。收到命令后,将对其进行协议解析,并将回复发送回客户端。
RESP协议说明
RESP协议是在Redis 1.2中引入的,但它成为了与Redis 2.0中的Redis服务器通信的标准方式。这是您应该在Redis客户端中实现的协议。
RESP实际上是一个支持以下数据类型的序列化协议:Simple Strings、Errors、Integers、Bulk Strings、Arrays。
RESP在Redis中用作请求 - 响应协议的方式如下:
客户端将命令作为Bulk Strings的RESP数组发送到Redis服务器,服务器根据命令实现回复一种RESP类型。在RESP中,某些数据的类型取决于第一个字节:
对于Simple Strings,回复的第一个字节是“+”
对于Errors,回复的第一个字节是“ - ”
对于Integers,回复的第一个字节是“:”
对于Bulk Strings,回复的第一个字节是“$”
对于Arrays,回复的第一个字节是“*”
此外,RESP能够使用指定的Bulk Strings或Array的特殊变体来表示Null值(如:"$-1
" 和 "*-1
" 都表示null)。在RESP中,协议的不同部分始终以“ r n”(CRLF)结束。
结构原理图:
需要的外包依赖:
<dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.26</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> </dependencies>
第一步:
将客户端发到服务端的命令,输入流后,进行解码,
再将解完码后的对象根据Command命令编码,不同命令的类通过其类加载器,调用其自身将命令对应的操作输出流到缓存中
package com.hhy; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.rmi.RemoteException; import java.util.ArrayList; import java.util.List; public class Protocol { public static Object read(InputStream is) throws IOException { return process(is); } public static Command readCommand(InputStream is) throws Exception { Object o = read(is); // 作为 Server 来说,一定不会收到 "+OK " if (!(o instanceof List)) { throw new Exception("命令必须是 Array 类型"); } List<Object> list = (List<Object>)o; if (list.size() < 1) { throw new Exception("命令元素个数必须大于 1"); } Object o2 = list.remove(0); if (!(o2 instanceof byte[])) { throw new Exception("错误的命令类型"); } byte[] array = (byte[])o2; String commandName = new String(array); String className = String.format("com.hhy.commands.%sCommand", commandName.toUpperCase()); Class<?> cls = Class.forName(className); //不属于这个接口 if (!Command.class.isAssignableFrom(cls)) { throw new Exception("错误的命令"); } Command command = (Command)cls.newInstance(); command.setArgs(list); return command; } private static String processSimpleString(InputStream is) throws IOException { return readLine(is); } private static String processError(InputStream is) throws IOException { return readLine(is); } private static long processInteger(InputStream is) throws IOException { return readInteger(is); } private static byte[] processBulkString(InputStream is) throws IOException { int len = (int)readInteger(is); if (len == -1) { // "$-1 " ==> null return null; } byte[] r = new byte[len]; is.read(r, 0, len); /* for (int i = 0; i < len; i++) { int b = is.read(); r[i] = (byte)b; } */ // "$5 hello "; is.read(); is.read(); return r; } private static List<Object> processArray(InputStream is) throws IOException { int len = (int)readInteger(is); if (len == -1) { // "*-1 " ==> null return null; } List<Object> list = new ArrayList<>(len); for (int i = 0; i < len; i++) { try { list.add(process(is)); } catch (RemoteException e) { list.add(e); } } return list; } private static Object process(InputStream is) throws IOException { int b = is.read(); if (b == -1) { throw new RuntimeException("不应该读到结尾的"); } switch (b) { case '+': return processSimpleString(is); case '-': //考虑异常被当做对象写进数组,所以定义一个额外的对象类,否则就会抛出异常,不会吧异常当做对象 throw new RemoteException(processError(is)); case ':': return processInteger(is); case '$': return processBulkString(is); case '*': return processArray(is); default: throw new RuntimeException("不识别的类型"); } } private static String readLine(InputStream is) throws IOException { boolean needRead = true; StringBuilder sb = new StringBuilder(); int b = -1; while (true) { if (needRead == true) { b = is.read(); if (b == -1) { throw new RuntimeException("不应该读到结尾的"); } } else { needRead = true; } if (b == ' ') { int c = is.read(); if (c == -1) { throw new RuntimeException("不应该读到结尾的"); } if (c == ' ') { break; } if (c == ' ') { sb.append((char) b); b = c; needRead = false; } else { sb.append((char) b); sb.append((char) c); } } else { sb.append((char)b); } } return sb.toString(); } public static long readInteger(InputStream is) throws IOException { boolean isNegative = false; StringBuilder sb = new StringBuilder(); int b = is.read(); if (b == -1) { throw new RuntimeException("不应该读到结尾"); } if (b == '-') { isNegative = true; } else { sb.append((char)b); } while (true) { b = is.read(); if (b == -1) { throw new RuntimeException("不应该读到结尾的"); } if (b == ' ') { int c = is.read(); if (c == -1) { throw new RuntimeException("不应该读到结尾的"); } if (c == ' ') { break; } throw new RuntimeException("没有读到\r\n"); } else { sb.append((char)b); } } long v = Long.parseLong(sb.toString()); if (isNegative) { v = -v; } return v; } public static void writeError(OutputStream os, String message) throws IOException { os.write('-'); os.write(message.getBytes()); os.write(" ".getBytes()); } public static void writeInteger(OutputStream os, long v) throws IOException { // v = 10 //:10 // v = -1 //:-1 os.write(':'); os.write(String.valueOf(v).getBytes()); os.write(" ".getBytes()); } public static void writeArray(OutputStream os, List<?> list) throws Exception { os.write('*'); os.write(String.valueOf(list.size()).getBytes()); os.write(" ".getBytes()); for (Object o : list) { if (o instanceof String) { writeBulkString(os, (String)o); } else if (o instanceof Integer) { writeInteger(os, (Integer)o); } else if (o instanceof Long) { writeInteger(os, (Long)o); } else { throw new Exception("错误的类型"); } } } public static void writeBulkString(OutputStream os, String s) throws IOException { byte[] buf = s.getBytes(); os.write('$'); os.write(String.valueOf(buf.length).getBytes()); os.write(" ".getBytes()); os.write(buf); os.write(" ".getBytes()); } public static void writeNull(OutputStream os) throws IOException { os.write('$'); os.write('-'); os.write('1'); os.write(' '); os.write(' '); } }
命令的接口与实现类(只实现了两类,List与HashMap的增加与查询):
package com.hhy; import java.io.IOException; import java.io.OutputStream; import java.util.List; public interface Command { void setArgs(List<Object> args); void run(OutputStream os) throws IOException; }
lLPUSH:
package com.hhy.commands; import com.hhy.Command; import com.hhy.Database; import com.hhy.Protocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; import java.util.List; public class LPUSHCommand implements Command { private static final Logger logger = LoggerFactory.getLogger(LPUSHCommand.class); private List<Object> args; @Override public void setArgs(List<Object> args) { this.args = args; } @Override public void run(OutputStream os) throws IOException { if (args.size() != 2) { Protocol.writeError(os, "命令至少需要两个参数"); return; } String key = new String((byte[])args.get(0)); String value = new String((byte[])args.get(1)); logger.debug("运行的是 lpush 命令: {} {}", key, value); // 这种方式不是一个很好的线程同步的方式 List<String> list = Database.getList(key); list.add(0, value); logger.debug("插入后数据共有 {} 个", list.size()); Protocol.writeInteger(os, list.size()); } }
LRANGE:
package com.hhy.commands; import com.hhy.Command; import com.hhy.Database; import com.hhy.Protocol; import java.io.IOException; import java.io.OutputStream; import java.util.List; public class LRANGECommand implements Command { private List<Object> args; @Override public void setArgs(List<Object> args) { this.args = args; } @Override public void run(OutputStream os) throws IOException { String key = new String((byte[])args.get(0)); int start = Integer.parseInt(new String((byte[])args.get(1))); int end = Integer.parseInt(new String((byte[])args.get(2))); List<String> list = Database.getList(key); if (end < 0) { end = list.size() + end; } List<String> result = list.subList(start, end + 1); try { Protocol.writeArray(os, result); } catch (Exception e) { e.printStackTrace(); } } }
HGET:
package com.hhy.commands; import com.hhy.Command; import com.hhy.Database; import com.hhy.Protocol; import java.io.IOException; import java.io.OutputStream; import java.util.List; import java.util.Map; public class HGETCommand implements Command { private List<Object> args; @Override public void setArgs(List<Object> args) { this.args = args; } @Override public void run(OutputStream os) throws IOException { String key = new String((byte[])args.get(0)); String field = new String((byte[])args.get(1)); Map<String, String> hash = Database.getHashes(key); String value = hash.get(field); if (value != null) { Protocol.writeBulkString(os, value); } else { Protocol.writeNull(os); } } }
HSET:
package com.hhy.commands; import com.hhy.Command; import com.hhy.Database; import com.hhy.Protocol; import java.io.IOException; import java.io.OutputStream; import java.util.List; import java.util.Map; public class HSETCommand implements Command { private List<Object> args; @Override public void setArgs(List<Object> args) { this.args = args; } @Override public void run(OutputStream os) throws IOException { String key = new String((byte[])args.get(0)); String field = new String((byte[])args.get(1)); String value = new String((byte[])args.get(2)); Map<String, String> hash =Database.getHashes(key); boolean isUpdate = hash.containsKey(field); hash.put(field, value); if (isUpdate) { Protocol.writeInteger(os, 0); } else { Protocol.writeInteger(os, 1); } } }
自定义异常类:
package com.hhy.exceptions; public class RemoteException extends Exception { public RemoteException() { } public RemoteException(String message) { super(message); } public RemoteException(String message, Throwable cause) { super(message, cause); } public RemoteException(Throwable cause) { super(cause); } public RemoteException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } }
写到内存中去存储:
package com.hhy; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class Database { private static Map<String, List<String>> lists = new HashMap<>(); private static Map<String, Map<String, String>> hashes = new HashMap<>(); public static List<String> getList(String key) { /* List<String> list = lists.computeIfAbsent(key, k -> { return new ArrayList<>(); }); */ List<String> list = lists.get(key); if (list == null) { list = new ArrayList<>(); lists.put(key, list); } return list; } public static Map<String, String> getHashes(String key) { Map<String, String> hash = hashes.get(key); if (hash == null) { hash = new HashMap<>(); hashes.put(key, hash); } return hash; } }
多线程编程:
package com.hhy; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; public class MutliThread implements Runnable { private Socket client; public MutliThread(Socket client) { this.client = client; } @Override public void run() { while (true) { try { InputStream inputStream = client.getInputStream(); OutputStream outputStream = client.getOutputStream(); while (true) { Command command = Protocol.readCommand(inputStream); command.run(outputStream); } } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } } }
server服务端:
package com.hhy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Server { private static final Logger logger = LoggerFactory.getLogger(Server.class); public static void main(String[] args) throws IOException{ int port = 6379; ServerSocket serverSocket = new ServerSocket(port);//ServerScoket监听端口 System.out.println("服务器等待连接..."+serverSocket.getLocalSocketAddress()); ExecutorService executorService = Executors.newFixedThreadPool(20); while(true){ Socket client = serverSocket.accept(); System.out.println("有客户端连接到服务器..."+client.getRemoteSocketAddress()); executorService.execute(new MutliThread(client)); } } }
如何根据二进制字节流解析出命令名称?
项目中实现了一个 Protocol 类,专门用于协议解析,将二进制流转为Java对象。
如何根据命令名称获取到命令所对应的对象?
根据命令名称找到对应的类名称,这里采用了约定俗成的办法,把类名称和命令名称起成一样的。
根据类名称获取指定的对象,很容易想到通过反射。具体做法是每次命令创建一个新的对象,相同的命令共用同一个对象(通过单例模式实现)。