zoukankan      html  css  js  c++  java
  • PHP使用Redis的Pub/Sub(发布订阅)命令

    1.概念

    名称含义
    channel 频道:生产者和消费者直接操作的对象
    publish 生产者:向channel发送消息
    subscribe 消费者:订阅一个或多个channel
    psubscribe 消费者:匹配订阅一个或多个channel
    punsubscribe 退订:匹配退订,无参数则退订全部channel
    unsubscribe 退订:退订指定的channel,无参数则退订全部channel
    pubsub 列出当前活动channel(至少有一个订阅)

    2.注意事项

    1.生产者publish消息时打开一个连接,publish后连接可以立即关闭
    2.channel只接收publish发送的消息,自身不存储消息,如果channel没有被订阅,则消息丢弃
    3.订阅的消费者需要一直在线,阻塞获取消息,连接断开表示立即退订

    3.使用rawCommand命令实现发布订阅

    rawCommand是php-redis扩展中提供的命令,可以向redis发送任何原生的命令
    1.消费者订阅Subscribe.php
    消费者需要创建redis长连接,并且设置set_time_limit和default_socket_timeout,以确保阻塞获取消息过程php不超时,socket连接不超时

    <?php
    /**
     * Created by PhpStorm.
     * User: jmsite.cn
     * Date: 2019/1/23
     * Time: 11:29
     */
    //设置php脚本执行时间
    set_time_limit(0);
    //设置socket连接超时时间
    ini_set('default_socket_timeout', -1);
    //声明测试频道名称
    $channelName = "testPubSub";
    $channelName2 = "testPubSub2";
    try {
        $redis = new Redis();
        //建立一个长链接
        $redis->pconnect('192.168.75.132', 6379);
        //阻塞获取消息
        while (true){
            //构建命令参数
            $param = array('subscribe', $channelName, $channelName2);
            //使用call_user_func_array回调执行命令
            $ret = call_user_func_array(array($redis, 'rawCommand'), $param);
            //如果结果是消息结构
            if (isset($ret[0]) && $ret[0] == 'message'){
                //输出消息频道和消息内容
                echo "channel:".$ret[1].",message:".$ret[2]."
    ";
            } else {
                //没有消息休眠1秒
                sleep(1);
            }
        }
    } catch (Exception $e){
        echo $e->getMessage();
    }

    2.生产者发送消息Publish.php

    <?php
    /**
     * Created by PhpStorm.
     * User: jmsite.cn
     * Date: 2019/1/23
     * Time: 11:59
     */
    $channelName = "testPubSub";
    $channelName2 = "testPubSub2";
    //向指定频道发送消息
    try {
        $redis = new Redis();
        $redis->connect('192.168.75.132', 6379);
        for ($i=0;$i<5;$i++){
            $data = array('key' => 'key'.$i, 'data' => 'testdata');
            $param = array('publish', $channelName, json_encode($data));
            $ret = call_user_func_array(array($redis, 'rawCommand'), $param);
            print_r($ret);
        }
    } catch (Exception $e){
        echo $e->getMessage();
    }

    3.执行消费者订阅,开始阻塞获取消息php Subscribe.php
    4.执行生产者,开始发送消息php Publish.php

    php .Publish.php
    22222
    #返回执行结果:频道的订阅数量

    查看消费者终端

    php .Subscribe.php
    channel:testPubSub,message:{"key":"key0","data":"testdata"}
    channel:testPubSub,message:{"key":"key1","data":"testdata"}
    channel:testPubSub,message:{"key":"key2","data":"testdata"}
    channel:testPubSub,message:{"key":"key3","data":"testdata"}
    channel:testPubSub,message:{"key":"key4","data":"testdata"}

    消费者获取到了生产者发送的消息。

    4.直接使用php-redis扩展提供的方法实现发布订阅

    1.消费者订阅Subscribe.php

    <?php
    /**
     * Created by PhpStorm.
     * User: jmsite.cn
     * Date: 2019/1/23
     * Time: 11:29
     */
    //设置php脚本执行时间
    set_time_limit(0);
    //设置socket连接超时时间
    ini_set('default_socket_timeout', -1);
    //声明测试频道名称
    $channelName = "testPubSub";
    $channelName2 = "testPubSub2";
    try {
        $redis = new Redis();
        //建立一个长链接
        $redis->pconnect('192.168.75.132', 6379);
        //阻塞获取消息
        $redis->subscribe(array($channelName, $channelName2), function ($redis, $chan, $msg){
            echo "channel:".$chan.",message:".$msg."
    ";
        });
    } catch (Exception $e){
        echo $e->getMessage();
    }

    2.生产者发送消息Publish.php

    <?php
    /**
     * Created by PhpStorm.
     * User: jmsite.cn
     * Date: 2019/1/23
     * Time: 11:59
     */
    $channelName = "testPubSub";
    $channelName2 = "testPubSub2";
    //向指定频道发送消息
    try {
        $redis = new Redis();
        $redis->connect('192.168.75.132', 6379);
        for ($i=0;$i<5;$i++){
            $data = array('key' => 'key'.$i, 'data' => 'testdata');
            $ret = $redis->publish($channelName, json_encode($data));
            print_r($ret);
        }
    } catch (Exception $e){
        echo $e->getMessage();
    }

    3.执行消费者订阅,开始阻塞获取消息php Subscribe.php
    4.执行生产者,开始发送消息php Publish.php

    php .Publish.php
    22222
    #返回执行结果:频道的订阅数量

    查看消费者终端

    php .Subscribe.php
    channel:testPubSub,message:{"key":"key0","data":"testdata"}
    channel:testPubSub,message:{"key":"key1","data":"testdata"}
    channel:testPubSub,message:{"key":"key2","data":"testdata"}
    channel:testPubSub,message:{"key":"key3","data":"testdata"}
    channel:testPubSub,message:{"key":"key4","data":"testdata"}

    消费者同样获取到了生产者发送的消息。
    退订和查看活动channel命令与生产者和消费者类似,只是参数不同而已。

     

     

     

     

     

    原文地址: https://www.jmsite.cn/blog-586.html

  • 相关阅读:
    测试 多线程 实现 callable 带返回值
    给定一个 hashMap 最终输出最大值的键
    正则判断输入的字符(英文、数字、空格、其他)的个数
    当返回值为json字符串时 如何获得其中的json数组
    thread run 和 start 的区别
    docker 构建dockerfile
    jsonp 跨域
    springsession 实现session 共享
    通过反射获得 spring 的 RequestMapping value值
    redis 集群搭建 以及 报错解决
  • 原文地址:https://www.cnblogs.com/liuzhongchao/p/10855359.html
Copyright © 2011-2022 走看看