zoukankan      html  css  js  c++  java
  • 【RabbitMQ 实战指南】一 RabbitMQ 开发

    1、RabbitMQ 安装

    RabbitMQ 的安装可以参考官方文档:https://www.rabbitmq.com/download.html

    2、管理页面

    rabbitmq-management插件提供基于HTTP的API方式管理和监控你的RabbitMQ服务器。

    2.1、开启 rabbitmq_management 插件

    rabbitmq-plugins enable rabbitmq_management

    windows 下运行如下命令

    rabbitmq-plugins.bat enable rabbitmq_management

    2.2、登录管理页面

    浏览器输入: http://localhost:15672 , 输入用户名和密码(默认为guest)。管理页面如下图:

     

    3、php 安装 amqp 扩展

    3.1、 下载对应的扩展

    下载地址: https://pecl.php.net/package/amqp

    假如需要安装 amqp-1.9.4

    wget http://pecl.php.net/get/amqp-1.9.4.tgz

    3.2、安装

    tar -zxvf amqp-1.9.4.tgz
    cd cd amqp-1.9.4/
    ./configure --with-php-config=/path/to/php-config
    make && make install
    vim /path/to/php.ini
    extension=amqp.so

    3.3、验证

    php -m | grep amqp

    4、开发

    项目采用 composer 管理依赖, 对应代码地址:https://github.com/SevenParadise/php-examples/tree/master/mq/rabbitmq/sample

    4.1、安装php-amqplib依赖

    composer require php-amqplib/php-amqplib

    4.2、生产者代码

    <?php 
    require __DIR__ . '/../../../vendor/autoload.php';
    
    use PhpAmqpLibMessageAMQPMessage;
    use PhpAmqpLibExchangeAMQPExchangeType;
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    // todo 换成自己的配置
    $host = '192.168.33.1';
    $port = 5672;
    $username = 'zhangcs';
    $password = 'zhangcs';
    $vhost = '/';
    
    // 1、连接到 RabbitMQ Broker,建立一个连接
    $connection = new AMQPStreamConnection($host, $port, $username, $password, $vhost);
    
    // 2、开启一个通道
    $channel = $connection->channel();
    
    $exchange = 'test_exchange';
    $queue = 'test_queue';
    // 3、声明一个交换器,并且设置相关属性
    $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
    
    // 4、声明一个队列, 并且设置相关属性
    $channel->queue_declare($queue, false, true, false, false);
    
    // 5、通过路由键将交换器和队列绑定起来
    $channel->queue_bind($queue, $exchange);
    
    $body = $argv[1] ?? 'Hello RabbitMQ';
    
    // 6、初始化消息,并且持久化消息
    $message = new AMQPMessage($body, [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
    ]);
    
    // 7、将消息发送到 RabbitMQ Broker
    $channel->basic_publish($message, $exchange);
    
    // 8、关闭通道
    $channel->close();
    // 9、关闭连接
    $connection->close();

    4.3、消费者代码

    <?php 
    require __DIR__ . '/../../../vendor/autoload.php';
    
    use PhpAmqpLibExchangeAMQPExchangeType;
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    // todo 换成自己的配置
    $host = '192.168.33.1';
    $port = 5672;
    $username = 'zhangcs';
    $password = 'zhangcs';
    $vhost = '/';
    
    // 1、连接到 RabbitMQ Broker,建立一个连接
    $connection = new AMQPStreamConnection($host, $port, $username, $password, $vhost);
    
    // 2、开启一个通道
    $channel = $connection->channel();
    
    $exchange = 'test_exchange';
    $queue = 'test_queue';
    
    // 3、声明一个交换器,并且设置相关属性
    $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
    
    // 4、声明一个队列, 并且设置相关属性
    $channel->queue_declare($queue, false, true, false, false);
    
    // 5、通过路由键将交换器和队列绑定起来
    $channel->queue_bind($queue, $exchange);
    
    function process_message($message)
    {
        echo "
    --------
    ";
        echo $message->body;
        echo "
    --------
    ";
    
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        // Send a message with the string "quit" to cancel the consumer.
        if ($message->body === 'quit') {
            $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
        }
    }
    
    // 6、消费消息,并且设置回调函数为 process_message
    $channel->basic_consume($queue, 'hahah', false, false, false, false, 'process_message');
    
    // 7、注册终止函数,关闭通道,关闭连接
    function shutdown($channel, $connection)
    {
        $channel->close();
        $connection->close();
    }
    register_shutdown_function('shutdown', $channel, $connection);
    
    // 8、一直阻塞消费数据
    while ($channel ->is_consuming()) {
        $channel->wait();
    }

    4.4、运行

    运行消费者

    $ php mq/rabbitmq/sample/consumer.php

    运行生产者

    $ php mq/rabbitmq/sample/producer.php 'message'
    # 关闭消费者
    $ php mq/rabbitmq/sample/producer.php 'quit'

    消费结果如下:

    5、方法详解

    5.1、exchange_declare 方法

    声明交换器方法,函数声明如下

    public function exchange_declare(
        $exchange,
        $type,
        $passive = false,
        $durable = false,
        $auto_delete = true,
        $internal = false,
        $nowait = false,
        $arguments = array(),
        $ticket = null
    )
    • $exchange: 交换器名称
    • $type: 交换器类型,常见的如 fanout、direct、topic,详情见:【RabbitMQ 实战指南】一 RabbitMQ入门
    • $passive: 判断交换器是否存在,当设置为true是,然后交换器不存在时,会抛出异常
    • $durable: 设置是否持久化。durable 设置为true表示持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关消息。
    • $auto_delete: 设置是否自动删除,auto_delete 设置为 true 则表示自动删除。自动 删除的前提是至少有一个队列或者交换器与这个交换器绑定 ,之后所有与这个交换器绑 定的队列或者交换器都与 此解绑。注意不能错误地把这个参数理解为 : "当与此交换器 连接的客户端都断开时 , RabbitMQ 会自动 删 除本交换器 "。 
    • $internal: 设置是否是内置的。如果设置为 true,则表示是内置的交换器,客户端程 序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。 
    • $nowait: 设置是否需要等待服务器返回回执消息,默认为false

    5.2、queue_declare 方法

    声明队列方法,函数声明如下

    public function queue_declare(
        $queue = '',
        $passive = false,
        $durable = false,
        $exclusive = false,
        $auto_delete = true,
        $nowait = false,
        $arguments = array(),
        $ticket = null
    ) 
    • $queue: 队列名称
    • $passive: 判断队列是否存在,当设置为true是,然后队列不存在时,会抛出异常
    • $durable: 设置是否持久化。durable 设置为true表示持久化。持久化可以将队列存盘,在服务器重启的时候不会丢失相关消息。
    • $exclusive: 设置为排他。为 true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问统一连接创建的排他队列;"首次" 是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用与一个客户端同时发送和读取消息的应用场景。
    • $auto_delete: 设置是否自动删除。为true则设置队列为自动删除。自动删除的前提是: 至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会 自动删除。不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这 个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队 列连接时,都不会自动删除这个队列。 
    • $nowait: 设置是否需要等待服务器返回回执消息,默认为false

    5.3、queue_bind 方法

    绑定队列和交换器方法,函数声明如下

    public function queue_bind(
        $queue,
        $exchange,
        $routing_key = '',
        $nowait = false,
        $arguments = array(),
        $ticket = null
    )
    • $queue: 队列名称
    • $exchange: 交换器名称
    • $routing_key : 用来绑定队列和交换器的路由键
    • $nowait: 设置是否需要等待服务器返回回执消息,默认为false

    5.4、exchange_bind 方法

    我们不仅可以将队列和交换器绑定,也可以将交换器和交换器绑定,函数声明如下

    public function exchange_bind(
        $destination,
        $source,
        $routing_key = '',
        $nowait = false,
        $arguments = array(),
        $ticket = null
    )
    • $destination: 目标交换器名称
    • $source: 源交换器名称
    • $routing_key : 用来源交换器和目标交换器的路由键
    • $nowait: 设置是否需要等待服务器返回回执消息,默认为false

    生产者发送消息至交换器 source 中, 交换器 source 根据路由键找到与其匹配的另一个交换器 destination, 并把消息转发到 destination 中,进而存储在 destination 绑定的 queue 中,如下图:

    5.5、basic_publish

    发送消息方法,函数声明如下

    public function basic_publish(
        $msg,
        $exchange = '',
        $routing_key = '',
        $mandatory = false,
        $immediate = false,
        $ticket = null
    )
    • $msg: 需要发送的消息,PhpAmqpLibMessageAMQPMessage 对象,可以设置特定属性,比如消息是否持久化,消息的优先级
    • $exchange: 交换器的名称,指明消息需要发送到哪个交换器中,如果设置为空字符串,会发送给 RabbitMQ 默认的交换器 " AMQP default " 中
    • $routing_key: 路由键
    •  $mandatory: 当 mandatory 设置为true 时,交换器无法通过自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者 。当 mandatory 参 数设置为 false 时,出现上述情形,则消息直接被丢弃 。 生产者可以通过添加一个监听器监听消息是否正确路由到队列中
    • $immediate: 当 imrnediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在 任何消费者,那么这条消息将不会存入队列中。当与路由键匹配 的所有队列都没有消费者时 , 该消息会至生产者。 

    5.6、basic_consume

    消费消息方法,函数声明如下

    public function basic_consume(
        $queue = '',
        $consumer_tag = '',
        $no_local = false,
        $no_ack = false,
        $exclusive = false,
        $nowait = false,
        $callback = null,
        $ticket = null,
        $arguments = array()
    ) 
    • $queue: 队列名称
    • $consumer_tag: 消费者标签,用来区分多个消费者
    • $no_local: 设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者
    • $no_ack: 设置为自动确认,详细可参考连接: RabbitMQ 之 no_ack 分析
    • $exclusive: 是否设置为排他
    • $callback: 设置消费者的回调函数。用于处理 RabbitMQ 推送过来的消息
  • 相关阅读:
    函数节流和防抖
    前端优化
    webpack模块
    link和@import的区别
    BFC--CSS
    javaoop_破解jdbc
    javaoop反射
    java-oop集合与泛型
    java中几个小遗漏
    java异常处理和日志管理
  • 原文地址:https://www.cnblogs.com/Zhangcsc/p/11677281.html
Copyright © 2011-2022 走看看