zoukankan      html  css  js  c++  java
  • rabbitmq fanout模式(发布订阅)

    生产者
    require_once './vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $exchangeName = 'exchange_fanout_1';
    // $queueName = 'queue_fanout_1';
    
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'test', 'test', '/');
    $channel = $connection->channel();
    
    
    //推送成功
    $channel->set_ack_handler(
        function (AMQPMessage $message) {
            echo "成功: " . $message->body . PHP_EOL;
        }
    );
    
    //推送失败
    $channel->set_nack_handler(
        function (AMQPMessage $message) {
            echo "失败: " . $message->body . PHP_EOL;
        }
    );
    $channel->confirm_select();
    
    
    $channel->exchange_declare($exchangeName, 'fanout', false, false, false);
    
    for ($i = 0; $i < 10; $i++) {
        $msg = new AMQPMessage('消息: '.$i);
        $channel->basic_publish($msg, $exchangeName);
    }
    
    
    $channel->wait_for_pending_acks();
    $channel->close();
    $connection->close();
    

      

    消费者
    require_once './vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'test', 'test', '/');
    $channel = $connection->channel();
    
    $exchangeName = 'exchange_fanout_1';
    $queueName = $argv[1];
    
    $channel->exchange_declare($exchangeName, 'fanout', false, false, false);
    $channel->queue_declare($queueName, false, false, false, false);
    $channel->queue_bind($queueName, $exchangeName);
    
    echo " [*] Waiting for messages. To exit press CTRL+C
    ";
    
    $callback = function ($msg) {
        echo ' [x] Received ', $msg->body, "
    ";
    };
    
    $channel->basic_consume($queueName, '', false, true, false, false, $callback);
    
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();
    

      php producter.php

    
    
    

           php customer.php  "exchange_fanout_1"

  • 相关阅读:
    spark 修改默认log4j.properties 配置
    shell $x的含义
    hadoop hdfs 有内网、公网ip后,本地调试访问不了集群解决
    JAVA concurrent包下Semaphore、CountDownLatch等用法
    ETL DAG调度策略
    python2.7 Cheetah You don't have the C version of NameMapper installed
    python threading 用法
    log4j2 Filter用法详解
    ThreadLocal 原理及一些实现
    ETL hive update 之 deltamerge 优化
  • 原文地址:https://www.cnblogs.com/setevn/p/14645744.html
Copyright © 2011-2022 走看看