zoukankan      html  css  js  c++  java
  • mini Redis(项目 二)

    一个仿Redis的内存数据库(主要用来做命令解析)服务端,

     客户端使用的开源工具 :  https://dom4j.github.io/      github:https://github.com/hehaoyuan/mini-Redis

    Redis 简介:
    Redis是一个开源的底层使用C语言编写的key-value内存数据库。可用于缓存数据、事件发布订阅、高速队列等场景,而且支持丰富的数据类型:string(字符串)、hash(哈希)、list(列表)、set(无序集合)、zset(有序集合)


    使用场景:

    使用场景
    随着数据量的增长,MySQL已经满足不了大型互联网类应用的需求。因此,Redis基于内存存储数据,在某些场景下,就会大大提高效率。

    缓存:对于热点数据,缓存以后可能读取数十万次,因此对于热点数据,不但缓存的价值非常大,而且当总数据量比较大的时候直接从数据库中查询会比较影响性能。例如:对经常需要查询且变动不是很频繁的数据,可以考虑基于Redis实现缓存。
    会话缓存:Redis还可以进行会话缓存。例如:将web session存放在Redis中。
    计数器:因为Redis具有原子性,所以在某些方面可以避免并发问题,比如:统计点击率、点赞率、收藏率等。
    消息队列:Redis能作为一个很好的消息队列来使用,依赖List类型利用LPUSH命令将数据添加到链表头部,通过BRPOP命令将元素从链表尾部取出。
    社交列表:社交属性相关的列表信息,例如,用户点赞列表、用户分享列表、用户收藏列表、用户关注列表、用户粉丝列表等,使用Hash类型数据结构是个不错的选择。
    最新动态:按照时间顺序排列的最新动态,也是一个很好的应用,可以使用Sorted Set类型的分数权重存储时间戳进行排序。


    Redis数据类型
    String(字符串)

    String是Redis的最基本数据结构,以一个键和一个值存储在Redis内部,类似java的Map结构,可以通过键去找值。

    Hash(哈希)

    Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象,类似Java里面的Map<String, Object>。

    List(列表)

    根据插入顺序排序的字符串元素的集合,底层实现是链表。

    Set(集合)

    Redis的Set是string类型的无序集合,它是通过HashTable实现的。

    zset(sorted set:有序集合)

    Redis zset和set一样也是string类型元素的集合,且不允许重复。但每个字符串元素与浮点数值相关联,称为分数,元素总是按其分数排序,可以检索一系列元素。

    Redis协议规范
    Redis客户端使用名为RESP(REdis序列化协议)的协议与Redis服务器通信。虽然该协议是专为Redis设计的,但它可以用于其他客户端 - 服务器软件项目。

    RESP可以序列化不同的数据类型,如整数,字符串,数组,还有一种特定的错误类型。请求从客户端发送到Redis服务器,作为表示要执行的命令的参数的字符串数组,Redis使用特定于命令的数据类型进行回复。

    注意:此处概述的协议仅用于客户端 - 服务器通信。Redis Cluster使用不同的二进制协议,以便在节点之间交换消息。

    网络层:客户端连接到Redis服务器,创建到端口6379的TCP连接。

    请求 - 响应模型:Redis接受由不同参数组成的命令。收到命令后,将对其进行协议解析,并将回复发送回客户端。

    RESP协议说明
    RESP协议是在Redis 1.2中引入的,但它成为了与Redis 2.0中的Redis服务器通信的标准方式。这是您应该在Redis客户端中实现的协议。

    RESP实际上是一个支持以下数据类型的序列化协议:Simple Strings、Errors、Integers、Bulk Strings、Arrays。

    RESP在Redis中用作请求 - 响应协议的方式如下:

    客户端将命令作为Bulk Strings的RESP数组发送到Redis服务器,服务器根据命令实现回复一种RESP类型。在RESP中,某些数据的类型取决于第一个字节:

    对于Simple Strings,回复的第一个字节是“+”
    对于Errors,回复的第一个字节是“ - ”
    对于Integers,回复的第一个字节是“:”
    对于Bulk Strings,回复的第一个字节是“$”
    对于Arrays,回复的第一个字节是“*”
    此外,RESP能够使用指定的Bulk Strings或Array的特殊变体来表示Null值(如:"$-1 " 和 "*-1 " 都表示null)。在RESP中,协议的不同部分始终以“ r n”(CRLF)结束。


    结构原理图:

     

    需要的外包依赖:

        <dependencies>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.26</version>
            </dependency>
    
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.2.3</version>
            </dependency>
        </dependencies>

    第一步:

    将客户端发到服务端的命令,输入流后,进行解码,

    再将解完码后的对象根据Command命令编码,不同命令的类通过其类加载器,调用其自身将命令对应的操作输出流到缓存中

    package com.hhy;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.rmi.RemoteException;
    import java.util.ArrayList;
    import java.util.List;
    
    public class Protocol {
        public static Object read(InputStream is) throws IOException {
            return process(is);
        }
    
        public static Command readCommand(InputStream is) throws Exception {
            Object o  = read(is);
            // 作为 Server 来说,一定不会收到 "+OK
    "
            if (!(o instanceof List)) {
                throw new Exception("命令必须是 Array 类型");
            }
    
            List<Object> list = (List<Object>)o;
            if (list.size() < 1) {
                throw new Exception("命令元素个数必须大于 1");
            }
    
            Object o2 = list.remove(0);
            if (!(o2 instanceof byte[])) {
                throw new Exception("错误的命令类型");
            }
    
            byte[] array = (byte[])o2;
            String commandName = new String(array);
            String className = String.format("com.hhy.commands.%sCommand", commandName.toUpperCase());
            Class<?> cls = Class.forName(className);
            //不属于这个接口
            if (!Command.class.isAssignableFrom(cls)) {
                throw new Exception("错误的命令");
            }
            Command command = (Command)cls.newInstance();
            command.setArgs(list);
    
            return command;
        }
    
        private static String processSimpleString(InputStream is) throws IOException {
            return readLine(is);
        }
    
        private static String processError(InputStream is) throws IOException {
            return readLine(is);
        }
    
        private static long processInteger(InputStream is) throws IOException {
            return readInteger(is);
        }
    
        private static byte[] processBulkString(InputStream is) throws IOException {
            int len = (int)readInteger(is);
            if (len == -1) {
                // "$-1
    "    ==> null
                return null;
            }
    
            byte[] r = new byte[len];
            is.read(r, 0, len);
            /*
            for (int i = 0; i < len; i++) {
                int b = is.read();
                r[i] = (byte)b;
            }
            */
    
            // "$5
    hello
    ";
            is.read();
            is.read();
    
            return r;
        }
    
        private static List<Object> processArray(InputStream is) throws IOException {
            int len = (int)readInteger(is);
            if (len == -1) {
                // "*-1
    "        ==> null
                return null;
            }
    
            List<Object> list = new ArrayList<>(len);
            for (int i = 0; i < len; i++) {
                try {
                    list.add(process(is));
                } catch (RemoteException e) {
                    list.add(e);
                }
            }
    
            return list;
        }
        private static Object process(InputStream is) throws IOException {
            int b = is.read();
            if (b == -1) {
                throw new RuntimeException("不应该读到结尾的");
            }
    
            switch (b) {
                case '+':
                    return processSimpleString(is);
                case '-':
                    //考虑异常被当做对象写进数组,所以定义一个额外的对象类,否则就会抛出异常,不会吧异常当做对象
                    throw new RemoteException(processError(is));
                case ':':
                    return processInteger(is);
                case '$':
                    return processBulkString(is);
                case '*':
                    return processArray(is);
                default:
    
                    throw new RuntimeException("不识别的类型");
            }
        }
    
        private static String readLine(InputStream is) throws IOException {
            boolean needRead = true;
            StringBuilder sb = new StringBuilder();
            int b = -1;
            while (true) {
                if (needRead == true) {
                    b = is.read();
                    if (b == -1) {
                        throw new RuntimeException("不应该读到结尾的");
                    }
                } else {
                    needRead = true;
                }
    
                if (b == '
    ') {
                    int c = is.read();
                    if (c == -1) {
                        throw new RuntimeException("不应该读到结尾的");
                    }
    
                    if (c == '
    ') {
                        break;
                    }
    
                    if (c == '
    ') {
                        sb.append((char) b);
                        b = c;
                        needRead = false;
                    } else {
                        sb.append((char) b);
                        sb.append((char) c);
                    }
                } else {
                    sb.append((char)b);
                }
            }
            return sb.toString();
        }
    
        public static long readInteger(InputStream is) throws IOException {
            boolean isNegative = false;
            StringBuilder sb = new StringBuilder();
            int b = is.read();
            if (b == -1) {
                throw new RuntimeException("不应该读到结尾");
            }
    
            if (b == '-') {
                isNegative = true;
            } else {
                sb.append((char)b);
            }
    
            while (true) {
                b = is.read();
                if (b == -1) {
                    throw new RuntimeException("不应该读到结尾的");
                }
    
                if (b == '
    ') {
                    int c = is.read();
                    if (c == -1) {
                        throw new RuntimeException("不应该读到结尾的");
                    }
    
                    if (c == '
    ') {
                        break;
                    }
    
                    throw new RuntimeException("没有读到\r\n");
                } else {
                    sb.append((char)b);
                }
            }
    
            long v = Long.parseLong(sb.toString());
            if (isNegative) {
                v = -v;
            }
    
            return v;
        }
    
        public static void writeError(OutputStream os, String message) throws IOException {
            os.write('-');
            os.write(message.getBytes());
            os.write("
    ".getBytes());
        }
    
        public static void writeInteger(OutputStream os, long v) throws IOException {
            // v = 10
            //:10
    
    
            // v = -1
            //:-1
    
    
            os.write(':');
            os.write(String.valueOf(v).getBytes());
            os.write("
    ".getBytes());
        }
    
        public static void writeArray(OutputStream os, List<?> list) throws Exception {
            os.write('*');
            os.write(String.valueOf(list.size()).getBytes());
            os.write("
    ".getBytes());
            for (Object o : list) {
                if (o instanceof String) {
                    writeBulkString(os, (String)o);
                } else if (o instanceof Integer) {
                    writeInteger(os, (Integer)o);
                } else if (o instanceof Long) {
                    writeInteger(os, (Long)o);
                } else {
                    throw new Exception("错误的类型");
                }
            }
        }
    
        public static void writeBulkString(OutputStream os, String s) throws IOException {
            byte[] buf = s.getBytes();
            os.write('$');
            os.write(String.valueOf(buf.length).getBytes());
            os.write("
    ".getBytes());
            os.write(buf);
            os.write("
    ".getBytes());
        }
    
        public static void writeNull(OutputStream os) throws IOException {
            os.write('$');
            os.write('-');
            os.write('1');
            os.write('
    ');
            os.write('
    ');
        }
    }

    命令的接口与实现类(只实现了两类,List与HashMap的增加与查询):

    package com.hhy;
    
    import java.io.IOException;
    import java.io.OutputStream;
    import java.util.List;
    
    public interface Command {
        void setArgs(List<Object> args);
    
        void run(OutputStream os) throws IOException;
    }

    lLPUSH:

    package com.hhy.commands;
    
    import com.hhy.Command;
    import com.hhy.Database;
    import com.hhy.Protocol;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.io.OutputStream;
    import java.util.List;
    
    public class LPUSHCommand implements Command {
        private static final Logger logger = LoggerFactory.getLogger(LPUSHCommand.class);
        private List<Object> args;
    
        @Override
        public void setArgs(List<Object> args) {
            this.args = args;
        }
    
        @Override
        public void run(OutputStream os) throws IOException {
            if (args.size() != 2) {
                Protocol.writeError(os, "命令至少需要两个参数");
                return;
            }
            String key = new String((byte[])args.get(0));
            String value = new String((byte[])args.get(1));
            logger.debug("运行的是 lpush 命令: {} {}", key, value);
    
            // 这种方式不是一个很好的线程同步的方式
            List<String> list = Database.getList(key);
            list.add(0, value);
    
            logger.debug("插入后数据共有 {} 个", list.size());
    
            Protocol.writeInteger(os, list.size());
        }
    }

    LRANGE:

    package com.hhy.commands;
    
    import com.hhy.Command;
    import com.hhy.Database;
    import com.hhy.Protocol;
    
    import java.io.IOException;
    import java.io.OutputStream;
    import java.util.List;
    
    public class LRANGECommand implements Command {
        private List<Object> args;
    
        @Override
        public void setArgs(List<Object> args) {
            this.args = args;
        }
    
        @Override
        public void run(OutputStream os) throws IOException {
            String key = new String((byte[])args.get(0));
            int start = Integer.parseInt(new String((byte[])args.get(1)));
            int end = Integer.parseInt(new String((byte[])args.get(2)));
    
            List<String> list = Database.getList(key);
            if (end < 0) {
                end = list.size() + end;
            }
            List<String> result = list.subList(start, end + 1);
            try {
                Protocol.writeArray(os, result);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    HGET:

    package com.hhy.commands;
    
    import com.hhy.Command;
    import com.hhy.Database;
    import com.hhy.Protocol;
    
    import java.io.IOException;
    import java.io.OutputStream;
    import java.util.List;
    import java.util.Map;
    
    public class HGETCommand implements Command {
        private List<Object> args;
    
        @Override
        public void setArgs(List<Object> args) {
            this.args = args;
        }
    
        @Override
        public void run(OutputStream os) throws IOException {
            String key = new String((byte[])args.get(0));
            String field = new String((byte[])args.get(1));
    
            Map<String, String> hash = Database.getHashes(key);
            String value = hash.get(field);
            if (value != null) {
                Protocol.writeBulkString(os, value);
            } else {
                Protocol.writeNull(os);
            }
        }
    }

    HSET:

    package com.hhy.commands;
    
    import com.hhy.Command;
    import com.hhy.Database;
    import com.hhy.Protocol;
    
    import java.io.IOException;
    import java.io.OutputStream;
    import java.util.List;
    import java.util.Map;
    
    public class HSETCommand implements Command {
        private List<Object> args;
    
        @Override
        public void setArgs(List<Object> args) {
            this.args = args;
        }
    
        @Override
        public void run(OutputStream os) throws IOException {
            String key = new String((byte[])args.get(0));
            String field = new String((byte[])args.get(1));
            String value = new String((byte[])args.get(2));
            Map<String, String> hash =Database.getHashes(key);
            boolean isUpdate = hash.containsKey(field);
            hash.put(field, value);
            if (isUpdate) {
                Protocol.writeInteger(os, 0);
            } else {
                Protocol.writeInteger(os, 1);
            }
        }
    }

    自定义异常类:

    package com.hhy.exceptions;
    
    public class RemoteException extends Exception {
        public RemoteException() {
        }
    
        public RemoteException(String message) {
            super(message);
        }
    
        public RemoteException(String message, Throwable cause) {
            super(message, cause);
        }
    
        public RemoteException(Throwable cause) {
            super(cause);
        }
    
        public RemoteException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
            super(message, cause, enableSuppression, writableStackTrace);
        }
    }

    写到内存中去存储:

    package com.hhy;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class Database {
        private static Map<String, List<String>> lists = new HashMap<>();
        private static Map<String, Map<String, String>> hashes = new HashMap<>();
    
    
        public static List<String> getList(String key) {
            /*
            List<String> list = lists.computeIfAbsent(key, k -> {
                return new ArrayList<>();
            });
             */
    
            List<String> list =  lists.get(key);
            if (list == null) {
                list = new ArrayList<>();
                lists.put(key, list);
            }
    
            return list;
        }
    
        public static Map<String, String> getHashes(String key) {
            Map<String, String> hash =  hashes.get(key);
            if (hash == null) {
                hash = new HashMap<>();
                hashes.put(key, hash);
            }
    
            return hash;
        }
    }

    多线程编程:

    package com.hhy;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.Socket;
    
    
    public class MutliThread implements Runnable {
    
        private Socket client;
    
        public MutliThread(Socket client) {
            this.client = client;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    InputStream inputStream = client.getInputStream();
                    OutputStream outputStream = client.getOutputStream();
    
                    while (true) {
                        Command command = Protocol.readCommand(inputStream);
                        command.run(outputStream);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    server服务端:

    package com.hhy;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Server {
        private static final Logger logger = LoggerFactory.getLogger(Server.class);
        
    
        public static void main(String[] args) throws IOException{
            int port = 6379;
    
            ServerSocket serverSocket = new ServerSocket(port);//ServerScoket监听端口
    
            System.out.println("服务器等待连接..."+serverSocket.getLocalSocketAddress());
    
            ExecutorService executorService = Executors.newFixedThreadPool(20);
    
            while(true){
    
                Socket client = serverSocket.accept();
    
                System.out.println("有客户端连接到服务器..."+client.getRemoteSocketAddress());
    
                executorService.execute(new MutliThread(client));
            }
        }
    }

    如何根据二进制字节流解析出命令名称?
    项目中实现了一个 Protocol 类,专门用于协议解析,将二进制流转为Java对象。

    如何根据命令名称获取到命令所对应的对象?
    根据命令名称找到对应的类名称,这里采用了约定俗成的办法,把类名称和命令名称起成一样的。
    根据类名称获取指定的对象,很容易想到通过反射。具体做法是每次命令创建一个新的对象,相同的命令共用同一个对象(通过单例模式实现)。

  • 相关阅读:
    绿豆加速器
    电脑派位系统(新生入学摇号) v2016
    硬盘安装win10
    msbuild
    async
    win sshd
    Ftp软件
    nginx basic auth 登陆验证模块
    深入理解docker的link机制
    Docker Compose to CoreOS
  • 原文地址:https://www.cnblogs.com/hetaoyuan/p/11331589.html
Copyright © 2011-2022 走看看