zoukankan      html  css  js  c++  java
  • RabbitMq的使用--一对一,一对多,队列,消息持久化-轮询,公平调度,自动,手动响应篇

    //生产者
    <?php namespace app abbit; //require_once __DIR__ . '/autoload.php'; //因为我自己使用的是tp框架 所以我在这里不需要再加载这个类了 use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; class Send { public function connect($num) { //连接rabbit $connection = new AMQPStreamConnection('***.**.***.***', 5672, 'mq用户名', 'mq密码'); //创建一个通道 $channel = $connection->channel(); //创建队列 $channel->queue_declare('hello', false, false, false, false); // 1.队列名 2.未知3.队列是否持久化 false:队列在内存中,服务器挂了 队列就没了 true:服务器挂了,队列重新生成 注意:只是队列持久化,不是队列仲的消息持久化!!! // 4.//队列是否专属,专属的范围针对的是连接,也就是说,一个连接下面的多个信道是可见的.对于其他连接是不可见的.连接断开后,该队列会被删除.注意,不是信道断开,是连接断开.并且,就算设置成了持久化,也会删除. // 5.//如果所有消费者都断开连接了,是否自动删除.如果还没有消费者从该队列获取过消息或者监听该队列,那么该队列不会删除.只有在有消费者从该队列获取过消息后,该队列才有可能自动删除(当所有消费者都断开连接,不管消息是否获取完) //创建一条信息 $msg = new AMQPMessage('如果我是沙兄+++'.$num); //推送一条消息到队列中 $res = $channel->basic_publish($msg, '', 'hello'); //通道关闭 $channel->close(); //连接关闭 $connection->close(); } }



    //消费者 要加多个消费者 创建多个这样的文件运行就行 rabbitMq默认是公平分配的方式 --轮询(平均分配)  就是说你有100条消息 消费者a已经消费完单数的50条 消费者b才消费完双数的10条 那也还有40条等着消费者b去消费,不会派给消费者a的

    <?php
    require_once __DIR__ . '/autoload.php'; //因为消费者一般来说是在命令行执行 所以要引入这个类
    use PhpAmqpLibConnectionAMQPStreamConnection;
            //创建一个连接
            $connection = new AMQPStreamConnection('***.***.***.***', 5672, 'mq用户名', 'mq密码');
            //创建一个通道
            $channel = $connection->channel();
            //声明一个队列是幂等的,它仅仅在不存在的情况下被创建
            $channel->queue_declare('hello', false, false, false, false);
            //拉取队列中的消息
            $res = $channel->basic_consume('hello', '', false, false, false, false, function ($msg) {
            //  file_put_contents('get.text', '这是消息2'.$msg->body."
    ", FILE_APPEND);
             echo '说:'.$msg->body."
    ";
             sleep(3);
                try {
                    var_dump($msg);
                    //这里是上面第四个消息回复设置为false 之后需要手动确认 不然的话消息不会删掉
                    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    
                }catch(Exception $e) {
                    return false;
                }
            });
    
    /**
     * 1.$queue  队列名
     * 2.$consumer_tag  消费者名称,自定义,可以为空
     * 3.$no_local  功能属于AMQP协议的标准,但是rabbitMQ并没有做实现。
     * 4.$no_ack  自动应答,当自动应答等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消息就会立即从队列中删除。一般设置为false,由消费者手动进行ack。
     * 5.$exclusive  是否排外(排外:queue只被一个消费者使用并且在消费者断开连接时queue被删除),一般默认为false。
     * 6.$nowait  当nowait为true时,不要等待服务器确认请求就立即开始消费消息。如果不能消费,有可能引发通道异常并关闭通道。一般默认设置为false。
     * 7.$callback  回调函数,拿到消息你的业务逻辑就写在里面。
     */
    
            while(count($channel->callbacks)) {
                $channel->wait();
            }

    //队列持久化和消息持久化和公平调度

    如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。

    首先,为了不让队列消失,需要把队列声明为持久化(durable)。为此我们通过queue_declare的第三参数为true: (这里生产者和消费者都必须改,上面我们已经声明了一个hello的队列是不持久化的,所以这里要重新声明一个新的队列,不然会报错)

    $channel->queue_declare('shaxiong', false, true, false, false);

    这时候,我们就可以确保在RabbitMq重启之后queue_declare队列不会丢失。另外,我们需要把我们的消息也要设为持久化——设置delivery_mode = 2。(在生产者创建消息的时候设置)

    $msg = new AMQPMessage($data,
           array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
           );
    //注意:消息持久化
    //将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)
    //——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,你可以使用publisher confirms。--https://www.rabbitmq.com/confirms.html
    //推送一条消息到队列中

    公平调度(多劳多得)

    在创建完通道之后加上这么一行代码(生产者和消费者都需要添加) 这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

    $channel->basic_qos(null, 1, null); //rabbitMq 削峰的表现

    可以看到 100条消息 我消费者a消费了99条,有些没截图到 消费者b只消费了一条

  • 相关阅读:
    Angular 学习笔记(四)
    Angular 学习笔记(三)
    Angular 学习笔记(二)
    Angular 学习笔记(一)
    ettercap + driftnet 实现同网段下流量欺骗
    kali 下使用 arpspoof 实现断网攻击
    JavaScript 语句
    vscode配置git及码云
    区块链入门
    C#情怀与未来
  • 原文地址:https://www.cnblogs.com/wqxq/p/14680291.html
Copyright © 2011-2022 走看看