zoukankan      html  css  js  c++  java
  • 【RabbitMQ 实战指南】一 RabbitMQ 开发

    1、RabbitMQ 安装

    RabbitMQ 的安装可以参考官方文档:https://www.rabbitmq.com/download.html

    2、管理页面

    rabbitmq-management插件提供基于HTTP的API方式管理和监控你的RabbitMQ服务器。

    2.1、开启 rabbitmq_management 插件

    rabbitmq-plugins enable rabbitmq_management

    windows 下运行如下命令

    rabbitmq-plugins.bat enable rabbitmq_management

    2.2、登录管理页面

    浏览器输入: http://localhost:15672 , 输入用户名和密码(默认为guest)。管理页面如下图:

     

    3、php 安装 amqp 扩展

    3.1、 下载对应的扩展

    下载地址: https://pecl.php.net/package/amqp

    假如需要安装 amqp-1.9.4

    wget http://pecl.php.net/get/amqp-1.9.4.tgz

    3.2、安装

    tar -zxvf amqp-1.9.4.tgz
    cd cd amqp-1.9.4/
    ./configure --with-php-config=/path/to/php-config
    make && make install
    vim /path/to/php.ini
    extension=amqp.so

    3.3、验证

    php -m | grep amqp

    4、开发

    项目采用 composer 管理依赖, 对应代码地址:https://github.com/SevenParadise/php-examples/tree/master/mq/rabbitmq/sample

    4.1、安装php-amqplib依赖

    composer require php-amqplib/php-amqplib

    4.2、生产者代码

    <?php 
    require __DIR__ . '/../../../vendor/autoload.php';
    
    use PhpAmqpLibMessageAMQPMessage;
    use PhpAmqpLibExchangeAMQPExchangeType;
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    // todo 换成自己的配置
    $host = '192.168.33.1';
    $port = 5672;
    $username = 'zhangcs';
    $password = 'zhangcs';
    $vhost = '/';
    
    // 1、连接到 RabbitMQ Broker,建立一个连接
    $connection = new AMQPStreamConnection($host, $port, $username, $password, $vhost);
    
    // 2、开启一个通道
    $channel = $connection->channel();
    
    $exchange = 'test_exchange';
    $queue = 'test_queue';
    // 3、声明一个交换器,并且设置相关属性
    $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
    
    // 4、声明一个队列, 并且设置相关属性
    $channel->queue_declare($queue, false, true, false, false);
    
    // 5、通过路由键将交换器和队列绑定起来
    $channel->queue_bind($queue, $exchange);
    
    $body = $argv[1] ?? 'Hello RabbitMQ';
    
    // 6、初始化消息,并且持久化消息
    $message = new AMQPMessage($body, [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
    ]);
    
    // 7、将消息发送到 RabbitMQ Broker
    $channel->basic_publish($message, $exchange);
    
    // 8、关闭通道
    $channel->close();
    // 9、关闭连接
    $connection->close();

    4.3、消费者代码

    <?php 
    require __DIR__ . '/../../../vendor/autoload.php';
    
    use PhpAmqpLibExchangeAMQPExchangeType;
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    // todo 换成自己的配置
    $host = '192.168.33.1';
    $port = 5672;
    $username = 'zhangcs';
    $password = 'zhangcs';
    $vhost = '/';
    
    // 1、连接到 RabbitMQ Broker,建立一个连接
    $connection = new AMQPStreamConnection($host, $port, $username, $password, $vhost);
    
    // 2、开启一个通道
    $channel = $connection->channel();
    
    $exchange = 'test_exchange';
    $queue = 'test_queue';
    
    // 3、声明一个交换器,并且设置相关属性
    $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
    
    // 4、声明一个队列, 并且设置相关属性
    $channel->queue_declare($queue, false, true, false, false);
    
    // 5、通过路由键将交换器和队列绑定起来
    $channel->queue_bind($queue, $exchange);
    
    function process_message($message)
    {
        echo "
    --------
    ";
        echo $message->body;
        echo "
    --------
    ";
    
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        // Send a message with the string "quit" to cancel the consumer.
        if ($message->body === 'quit') {
            $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
        }
    }
    
    // 6、消费消息,并且设置回调函数为 process_message
    $channel->basic_consume($queue, 'hahah', false, false, false, false, 'process_message');
    
    // 7、注册终止函数,关闭通道,关闭连接
    function shutdown($channel, $connection)
    {
        $channel->close();
        $connection->close();
    }
    register_shutdown_function('shutdown', $channel, $connection);
    
    // 8、一直阻塞消费数据
    while ($channel ->is_consuming()) {
        $channel->wait();
    }

    4.4、运行

    运行消费者

    $ php mq/rabbitmq/sample/consumer.php

    运行生产者

    $ php mq/rabbitmq/sample/producer.php 'message'
    # 关闭消费者
    $ php mq/rabbitmq/sample/producer.php 'quit'

    消费结果如下:

    5、方法详解

    5.1、exchange_declare 方法

    声明交换器方法,函数声明如下

    public function exchange_declare(
        $exchange,
        $type,
        $passive = false,
        $durable = false,
        $auto_delete = true,
        $internal = false,
        $nowait = false,
        $arguments = array(),
        $ticket = null
    )
    • $exchange: 交换器名称
    • $type: 交换器类型,常见的如 fanout、direct、topic,详情见:【RabbitMQ 实战指南】一 RabbitMQ入门
    • $passive: 判断交换器是否存在,当设置为true是,然后交换器不存在时,会抛出异常
    • $durable: 设置是否持久化。durable 设置为true表示持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关消息。
    • $auto_delete: 设置是否自动删除,auto_delete 设置为 true 则表示自动删除。自动 删除的前提是至少有一个队列或者交换器与这个交换器绑定 ,之后所有与这个交换器绑 定的队列或者交换器都与 此解绑。注意不能错误地把这个参数理解为 : "当与此交换器 连接的客户端都断开时 , RabbitMQ 会自动 删 除本交换器 "。 
    • $internal: 设置是否是内置的。如果设置为 true,则表示是内置的交换器,客户端程 序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。 
    • $nowait: 设置是否需要等待服务器返回回执消息,默认为false

    5.2、queue_declare 方法

    声明队列方法,函数声明如下

    public function queue_declare(
        $queue = '',
        $passive = false,
        $durable = false,
        $exclusive = false,
        $auto_delete = true,
        $nowait = false,
        $arguments = array(),
        $ticket = null
    ) 
    • $queue: 队列名称
    • $passive: 判断队列是否存在,当设置为true是,然后队列不存在时,会抛出异常
    • $durable: 设置是否持久化。durable 设置为true表示持久化。持久化可以将队列存盘,在服务器重启的时候不会丢失相关消息。
    • $exclusive: 设置为排他。为 true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问统一连接创建的排他队列;"首次" 是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用与一个客户端同时发送和读取消息的应用场景。
    • $auto_delete: 设置是否自动删除。为true则设置队列为自动删除。自动删除的前提是: 至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会 自动删除。不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这 个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队 列连接时,都不会自动删除这个队列。 
    • $nowait: 设置是否需要等待服务器返回回执消息,默认为false

    5.3、queue_bind 方法

    绑定队列和交换器方法,函数声明如下

    public function queue_bind(
        $queue,
        $exchange,
        $routing_key = '',
        $nowait = false,
        $arguments = array(),
        $ticket = null
    )
    • $queue: 队列名称
    • $exchange: 交换器名称
    • $routing_key : 用来绑定队列和交换器的路由键
    • $nowait: 设置是否需要等待服务器返回回执消息,默认为false

    5.4、exchange_bind 方法

    我们不仅可以将队列和交换器绑定,也可以将交换器和交换器绑定,函数声明如下

    public function exchange_bind(
        $destination,
        $source,
        $routing_key = '',
        $nowait = false,
        $arguments = array(),
        $ticket = null
    )
    • $destination: 目标交换器名称
    • $source: 源交换器名称
    • $routing_key : 用来源交换器和目标交换器的路由键
    • $nowait: 设置是否需要等待服务器返回回执消息,默认为false

    生产者发送消息至交换器 source 中, 交换器 source 根据路由键找到与其匹配的另一个交换器 destination, 并把消息转发到 destination 中,进而存储在 destination 绑定的 queue 中,如下图:

    5.5、basic_publish

    发送消息方法,函数声明如下

    public function basic_publish(
        $msg,
        $exchange = '',
        $routing_key = '',
        $mandatory = false,
        $immediate = false,
        $ticket = null
    )
    • $msg: 需要发送的消息,PhpAmqpLibMessageAMQPMessage 对象,可以设置特定属性,比如消息是否持久化,消息的优先级
    • $exchange: 交换器的名称,指明消息需要发送到哪个交换器中,如果设置为空字符串,会发送给 RabbitMQ 默认的交换器 " AMQP default " 中
    • $routing_key: 路由键
    •  $mandatory: 当 mandatory 设置为true 时,交换器无法通过自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者 。当 mandatory 参 数设置为 false 时,出现上述情形,则消息直接被丢弃 。 生产者可以通过添加一个监听器监听消息是否正确路由到队列中
    • $immediate: 当 imrnediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在 任何消费者,那么这条消息将不会存入队列中。当与路由键匹配 的所有队列都没有消费者时 , 该消息会至生产者。 

    5.6、basic_consume

    消费消息方法,函数声明如下

    public function basic_consume(
        $queue = '',
        $consumer_tag = '',
        $no_local = false,
        $no_ack = false,
        $exclusive = false,
        $nowait = false,
        $callback = null,
        $ticket = null,
        $arguments = array()
    ) 
    • $queue: 队列名称
    • $consumer_tag: 消费者标签,用来区分多个消费者
    • $no_local: 设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者
    • $no_ack: 设置为自动确认,详细可参考连接: RabbitMQ 之 no_ack 分析
    • $exclusive: 是否设置为排他
    • $callback: 设置消费者的回调函数。用于处理 RabbitMQ 推送过来的消息
  • 相关阅读:
    codeforces C. Fixing Typos 解题报告
    codeforces B. The Fibonacci Segment 解题报告
    codeforces B. Color the Fence 解题报告
    codeforces B. Petya and Staircases 解题报告
    codeforces A. Sereja and Bottles 解题报告
    codeforces B. Levko and Permutation 解题报告
    codeforces B.Fence 解题报告
    tmp
    API 设计 POSIX File API
    分布式跟踪的一个流行标准是OpenTracing API,该标准的一个流行实现是Jaeger项目。
  • 原文地址:https://www.cnblogs.com/Zhangcsc/p/11677281.html
Copyright © 2011-2022 走看看