zoukankan      html  css  js  c++  java
  • 关于Redis stream的使用

    由于使用的laravel框架,所以使用了框架自带的函数

    1. 这里先创建一个链接,给一个stream里面添加数据

    $streamKey = 'test:stream:queue';
    
            $redis = new Redis();
            $redis->connect('127.0.0.1');
    
            for ($i = 0; $i < 100; $i++) {
                /**
                 * 队列名
                 * *: 表示由Redis自己生成消息ID:规则为[毫秒时间戳+自增数]
                 * 存储的数据
                 */
                $xAddResult = $redis->xAdd($streamKey, '*', ['field-'.$i => 'value:'.$i*2]);
            }

    2.删除队列中的某一条消息

    /**
     * 删除消息
     * 队列名
     * 消息ID
     */
     $xDelResult = $redis->xDel($streamKey, ['1609131229884-0']);
    

    3.查看队列中的消息

    /**
      * 取出所有的消息
      * 队列名
      * 消息开始ID: - 不限制开始ID
      * 消息结束ID: + 表示不限制
      */
      $streamResult = $redis->xRange($streamKey, '-', '+');
      dd($streamResult);
    

      

    4.此时如果要消费队列中的消息,需要先创建一个group与队列关联起来,才可以消费队列中的消息

    /**
      * 创建一个消费组
      * 操作类型:['HELP', 'SETID', 'DELGROUP', 'CREATE', 'DELCONSUMER']
      * 队列名
      * 消费者 : 这个时候自己随便起名字就可以
      * 消息ID : 0 表示从头开始 $ 表示不接收老的消息
      */
      $xGroupResult = $redis->xGroup('CREATE', $streamKey, $streamKey.':group_1', 0);
      dd($xGroupResult);
    

      

    5.获取队列中的消息

    /**
             * group
             * 消费者
             * [队列名 => '>' : 特殊>ID,这意味着使用者只想接收从未传递给任何其他使用者的消息。这只是意味着,给我新消息。
             *  队列名 => '0' : 任何其他ID(即0或任何其他有效ID或不完整的ID(仅毫秒时间部分))将具有以下效果:返回正在等待用户发送的ID大于提供的ID的命令的条目。因此,基本上,如果ID不是>,那么该命令将只允许客户端访问其挂起的条目:传递给它的消息,但尚未确认。]
             * 一次性取多少条消息
             */
            $xReadGroupResult = $redis->xReadGroup($streamKey.':group_1', 'consumerA', [$streamKey => '>'], 1);
    

      

    6.从消费者组内读取消息并处理完成后,需确认该条消息已处理

    /**
             * 确认消息已处理
             * 队列名
             * 消费者组
             * 消息ID
             */
            $xAckResult = $redis->xAck($streamKey, $streamKey.':group_1', ['1609131229885-0']);
            dd($xAckResult);
    

      

    7. 用来获消费组或消费内消费者的未处理完毕的消息。

    /**
             * 用来获消费组或消费内消费者的未处理完毕的消息。
             * 队列名
             * 消费者组
             */
            $pendingResult = $redis->xPending($streamKey, $streamKey.':group_1');
            dd($pendingResult);
    

      

     

    其他更多的命令可以参考 Redis stream

  • 相关阅读:
    Js 获取当前时间
    C# 将datatable 转换json
    easyui 文本框验证长度
    js 为label标签和div标签赋值
    easy ui datagrid 设置冻结列
    Ext Grid 加载超时设置timeout: 180000
    jQuery uploadify-v3.1 批量上传
    MVC5+EF6+BootStrap3.3.5 博客系统之项目搭建(一)
    C# list 筛选FindAll
    ExtJS 添加图标icon
  • 原文地址:https://www.cnblogs.com/jing1208/p/14201069.html
Copyright © 2011-2022 走看看