zoukankan      html  css  js  c++  java
  • 【原创】RabbitMQ教程:php实现

    参考:https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/ (php-amqplib扩展库:demo

    参考:https://zhuanlan.zhihu.com/p/63700605 (知乎)

    简介:

    不要用的太复杂,一般定义好一个直连交换机(route_key)和一个扇形交换机就可以用了。

    生产端使用流程:

    1,创建连接:connection

    $connection = new AMQPStreamConnection('192.168.124.39', 5672, 'fort', 'fort');

    2,创建信道:channel

    $channel = $connection->channel();

    3,创建消息:message

    $msg = new AMQPMessage('Hello World!');
    
    $msg = new AMQPMessage($data,
           array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
           );//持久化的消息,与queue都需要定义好持久化参数

    4,创建队列:queue

    #空交换机的时候才需要定义queue
    $channel->queue_declare('hello', false, false, false, false);
    
    $channel->queue_declare('task_queue', false, true, false, false);//持久的消息,第3个参数

    5,创建交换机:exchange

    #定义了交换机就不需要定义queue
    $channel
    ->exchange_declare('logs', 'fanout', false, false, false); $channel->exchange_declare('direct_logs', 'direct', false, false, false); $channel->exchange_declare('topic_logs', 'topic', false, false, false);

    6,发布消息:publish

    $channel->basic_publish($msg, '', 'task_queue');
    
    $channel->basic_publish($msg, 'logs');//扇形交换机
    
    $channel->basic_publish($msg, 'direct_logs', $severity);//直连交换机
    
    $channel->basic_publish($msg, 'topic_logs', $routing_key);//主题交换机

    7,关闭信道和连接:

    $channel->closs();
    $connection->close();

    消费端使用流程

    1,创建连接:connection

    $connection = new AMQPStreamConnection('192.168.124.39', 5672, 'fort', 'fort');

    2,创建信道:channel

    $channel = $connection->channel();

    3,申明队列:queue(消费端必须有queue)

    //空交换机时,不需要申明交换机
    $channel->queue_declare('hello', false, false, false, false);
    
    //扇形,直连,主题交换机时
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); 

    4,申明交换机:exchange

    //扇形交换机
    $channel->exchange_declare('logs', 'fanout', false, false, false);
    
    //直连交换机
    $channel->exchange_declare('direct_logs', 'direct', false, false, false);
    
    //主题交换机
    $channel->exchange_declare('topic_logs', 'topic', false, false, false);

    5,交换机绑定到队列:

    //绑定扇形交换机
    $channel->queue_bind($queue_name, 'logs');
    
    //绑定直连交换机
    foreach($severities as $severity) {
        $channel->queue_bind($queue_name, 'direct_logs', $severity);
    }
    
    //绑定主题交换机
    foreach($binding_keys as $binding_key) {
        $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
    }

    6,消费队列;

    //空交换机时
    $channel->basic_consume('hello', '', false, true, false, false, $callback);
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    
    //扇形,直连,主题交换机时
    $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
    while(count($channel->callbacks)) {
        $channel->wait();
    }

    7,关闭信道和连接:

    $channel->closs();
    $connection->close();
  • 相关阅读:
    【LeetCode】456. 132 Pattern
    【Python&Sort】QuickSort
    Python虚拟环境的配置
    【LeetCode】459. Repeated Substring Pattern
    【LeetCode】462. Minimum Moves to Equal Array Elements II
    【LeetCode】20. Valid Parentheses
    radio 获取选中值
    页面加载时自动执行(加载)js的几种方法
    加一个字段: updateTime 更新时间
    多用户同时处理同一条数据解决办法
  • 原文地址:https://www.cnblogs.com/tkzc2013/p/13547205.html
Copyright © 2011-2022 走看看