zoukankan      html  css  js  c++  java
  • 编写自己的handler简单操作redis

      RESP是Redis Serialization Protocol的简称,也就是专门为redis设计的一套序列化协议。这个协议比较简单,简单的说就是发送请求的时候按Redis 约定的数据格式进行发送,解析数据的时候按redis规定的响应数据格式进行相应。

      无论是jedis还是lettuce, 最终发给redis的数据与接收到redis的响应数据的格式必须满足redis的协议要求。

    1. RedisClient 类

    package cn.xm.netty.example.redis3;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.util.concurrent.GenericFutureListener;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    
    public class RedisClient {
    
        private static final String HOST = System.getProperty("host", "192.168.145.139");
        private static final int PORT = Integer.parseInt(System.getProperty("port", "6379"));
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline p = ch.pipeline();
                                p.addLast(new RedisClientHandler());
                            }
                        });
    
                // Start the connection attempt.
                Channel ch = b.connect(HOST, PORT).sync().channel();
    
                // Read commands from the stdin.
                System.out.println("Enter Redis commands (quit to end)");
                ChannelFuture lastWriteFuture = null;
                BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
                for (; ; ) {
                    final String input = in.readLine();
                    final String line = input != null ? input.trim() : null;
                    if (line == null || "quit".equalsIgnoreCase(line)) { // EOF or "quit"
                        ch.close().sync();
                        break;
                    } else if (line.isEmpty()) { // skip `enter` or `enter` with spaces.
                        continue;
                    }
                    // Sends the received line to the server.
                    lastWriteFuture = ch.writeAndFlush(line);
                    lastWriteFuture.addListener(new GenericFutureListener<ChannelFuture>() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                System.err.print("write failed: ");
                                future.cause().printStackTrace(System.err);
                            }
                        }
                    });
                }
    
                // Wait until all messages are flushed before closing the channel.
                if (lastWriteFuture != null) {
                    lastWriteFuture.sync();
                }
            } finally {
                group.shutdownGracefully();
            }
        }
    }

      这个类比较简单就是一直for 循环等待控制台输入数据。并且添加的handler 只有一个handler。 也就是输入输出都是一个handler。

    2.  RedisClientHandler

    package cn.xm.netty.example.redis3;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelDuplexHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelPromise;
    import io.netty.util.CharsetUtil;
    import org.apache.commons.lang3.StringUtils;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.stream.Collectors;
    
    /**
     * An example Redis client handler. This handler read input from STDIN and write output to STDOUT.
     */
    public class RedisClientHandler extends ChannelDuplexHandler {
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            // 转换发出去的数据格式
            msg = rehandleRequest(msg);
            ctx.writeAndFlush(Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8));
        }
    
        /**
         * 重新处理消息,处理为 RESP 认可的数据
         * set foo bar
         * 对应下面数据
         * *3
    $3
    SET
    $3
    foo
    $3
    bar
    
         */
        private String rehandleRequest(Object msg) {
            String result = msg.toString().trim();
            String[] params = result.split(" ");
            List<String> allParam = new ArrayList<>();
            Arrays.stream(params).forEach(s -> {
                allParam.add("$" + s.length() + "
    " + s + "
    "); // 参数前$length
    , 参数后增加 
    
            });
            allParam.add(0, "*" + allParam.size() + "
    ");
            StringBuilder stringBuilder = new StringBuilder();
            allParam.forEach(p -> {
                stringBuilder.append(p);
            });
            return stringBuilder.toString();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf byteBuf = (ByteBuf) msg;
            byte[] bytes = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(bytes);
            String result = new String(bytes);
            // 转换接受到的数据格式
            result = rehandleResponse(result).toString();
            System.out.println(result);
        }
    
        /**
         * 重新处理响应消息
         */
        private Object rehandleResponse(String result) {
            // 状态恢复 - “+OK
    ”
            if (result.startsWith("+")) {
                return result.substring(1, result.length() - 2);
            }
    
            // 错误回复(error reply)的第一个字节是 "-"。例如 `flushallE` 返回的 `-ERR unknown command 'flushallE'
    `
            if (result.startsWith("-")) {
                return result.substring(1, result.length() - 2);
            }
    
            // 整数回复(integer reply)的第一个字节是 ":"。 例如 `llen mylist` 查看list 大小返回的 `:3
    `
            if (result.startsWith(":")) {
                return result.substring(1, result.length() - 2);
            }
    
            // 批量回复(bulk reply)的第一个字节是 "$", 例如:  `get foo` 返回的结果为 `$3
    bar
    `
            if (result.startsWith("$")) {
                result = StringUtils.substringAfter(result, "
    ");
                return StringUtils.substringBeforeLast(result, "
    ");
            }
    
            // 多条批量回复(multi bulk reply)的第一个字节是 "*", 例如: *2
    $3
    foo
    $4
    name
    
            if (result.startsWith("*")) {
                result = StringUtils.substringAfter(result, "
    ");
                String[] split = result.split("\$\d
    ");
                List<String> collect = Arrays.stream(split).filter(tmpStr -> StringUtils.isNotBlank(tmpStr)).collect(Collectors.toList());
                List<String> resultList = new ArrayList<>(collect.size());
                collect.forEach(str1 -> {
                    resultList.add(StringUtils.substringBeforeLast(str1, "
    "));
                });
                return resultList;
            }
    
            return "unknow result";
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            System.err.print("exceptionCaught: ");
            cause.printStackTrace(System.err);
            ctx.close();
        }
    
    }

      这个handler 继承 ChannelDuplexHandler 复用处理器,也就是既可以作为入站处理读取数据,也可以作为出站处理输出数据。输出和输入的时候都是根据redis 的协议标准进行了一下消息的转换。

    3. 测试

    1. 用strace 启动redis, 监测调用指令

    [root@localhost redistest]# strace -ff -o out ../redis-5.0.4/src/redis-server ../redis-5.0.4/redis.conf

    2. 命令行测试结果如下

    3. 从out文件查看redis 接收的命令:

    [root@localhost redistest]# grep mytest ./*
    ./out.8384:read(8, "*3
    $3
    set
    $9
    mytestkey
    $11
    "..., 16384) = 46
    ./out.8384:read(8, "*3
    $3
    ttl
    $9
    mytestkey
    $7
    "..., 16384) = 41
    ./out.8384:read(8, "*3
    $6
    expire
    $9
    mytestkey
    $"..., 16384) = 43
    ./out.8384:read(8, "*2
    $3
    ttl
    $9
    mytestkey
    ", 16384) = 28
    ./out.8384:read(8, "*2
    $4
    type
    $9
    mytestkey
    ", 16384) = 29

    注意: redis 接收到的数据必须以 结尾,否则redis 不会进行响应。

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    5-1 Leetcode中和链表相关的问题
    4-7 带有尾指针的链表:使用链表实现队列
    4.6 使用链表实现栈
    4.5 链表元素的删除
    4.4 链表的遍历、查询和修改
    4.3 为链表设置虚拟头结点dummyhead
    4.2在链表中添加元素
    4.1链表
    mybatis 力量操作参数为List的非空校验
    linux 运行和停止jar的shell 脚本
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/15128192.html
Copyright © 2011-2022 走看看