zoukankan      html  css  js  c++  java
  • rabbitmq AmqpClient 使用Topic 交换机同一个channel 同时多个队列 ,多个交换机,C++代码示例

      1 // 消息发送
      2 bool PublishExchangeTopicMulti(const std::string &strUri)
      3 {
      4     AmqpClient::Channel::ptr_t channel = 
      5         AmqpClient::Channel::CreateFromUri(strUri);
      6 
      7     if (channel == nullptr)
      8     {
      9         return false;
     10     }
     11 
     12     // 声明交换机,若不存在则创建 
     13     std::string strTopicExchange1 = "topic_exchange_1";
     14     std::string strTopicExchange2 = "topic_exchange_2";
     15     channel->DeclareExchange(strTopicExchange1, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC);
     16     channel->DeclareExchange(strTopicExchange2, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC);
     17 
     18     while (true)
     19     {
     20         // 可输入例如 "topic_exchange_1 disk.info 666"
     21         // "topic_exchange_2 any.warning 123"
     22         std::cout << "请输入[exchange] [routing_key1.routing_key2] [message]: " << std::endl;
     23     
     24         std::string strExchange;
     25         std::string severity;
     26         std::string message;
     27         std::cin >> strExchange;
     28         std::cin >> severity;
     29         std::cin >> message;
     30 
     31         channel->BasicPublish(strExchange, severity,
     32             AmqpClient::BasicMessage::Create(message));
     33 
     34         std::cout << "[X] to " << strExchange << ", send " 
     35             << severity << ": " << message << std::endl;
     36     }
     37 
     38 }
     39 
     40 
     41 void ReceiveTopicExchangeMulti(const std::string &strUri)
     42 {
     43     AmqpClient::Channel::ptr_t channel = 
     44         AmqpClient::Channel::CreateFromUri(strUri);
     45 
     46     if (channel == nullptr)
     47     {
     48         return ;
     49     }
     50 
     51     // 这里我们声明两个交换机,类型均为topic , 我们将通过不同的队列从两个交换机中取消息。
     52     // 这里交换机的名称需要与发送端的保持一致。
     53     std::string strTopicExchange1 = "topic_exchange_1";
     54     std::string strTopicExchange2 = "topic_exchange_2";
     55     channel->DeclareExchange(strTopicExchange1, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC);
     56     channel->DeclareExchange(strTopicExchange2, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC);
     57 
     58     // 这里我们声明了三个队列,第一个队列从交换机1 取数据第二三个队列从交换机2 取数据;
     59     // 但是第二三个队列所绑定的routing_key 有所不同。
     60     // 当然了,routing_key 也可以相同,这样的话相同routing_key 的消息就会在两个队列中都出现。
     61     std::string strTopicQueue_1 = "topic_queue_1";
     62     std::string strTopicQueue_2 = "topic_queue_2";
     63     std::string strTopicQueue_3 = "topic_queue_3";
     64     // 第一个参数若为空,则系统默认生成随机队列名称
     65     // 第三个参数 表明队列是否持久的。true: 服务器重启将会保留该Exchange,
     66     // 警告,若只设置此项,不代表消息的持久化。即不保证重启后消息还在。
     67     channel->DeclareQueue(strTopicQueue_1, false, true, false, false);
     68     channel->DeclareQueue(strTopicQueue_2, false, true, false, false);
     69     channel->DeclareQueue(strTopicQueue_3, false, true, false, false);
     70 
     71     // 队列绑定我们感兴趣的routing_key, 表示我们只接收这些routing_key 相关的消息。
     72     channel->BindQueue(strTopicQueue_1, strTopicExchange1, "*.info");
     73     channel->BindQueue(strTopicQueue_1, strTopicExchange1, "disk.*");
     74     channel->BindQueue(strTopicQueue_1, strTopicExchange1, "info.error");
     75 
     76     // 在交换机2 上面我们绑定了队列2 和队列3 。但是它们所关心的routing_key 不同。
     77     channel->BindQueue(strTopicQueue_3, strTopicExchange2, "*.info");
     78     channel->BindQueue(strTopicQueue_3, strTopicExchange2, "disk.*");
     79     channel->BindQueue(strTopicQueue_2, strTopicExchange2, "info.error");
     80 
     81     // 创建消费者标志,这个在后面会告诉 channel 我们需要哪些队列中的相关routing_key 的消息。
     82     // BasicConsume() 第五个参数是指该消息是否以独占的方式处理,若是则不允许第二个消费者绑定到该队列 上,
     83     // 若否,则多个消费者同时绑定到该队列,那么 在该队列上的消息将随机分配到某一个消费者。
     84     // 即,同一个消息将不会同时出现 在两个消费者身上。
     85     std::string strFlagConsume_1 = "tab_consume_1";
     86     std::string strFlagConsume_2 = "tab_consume_2";
     87     std::string strFlagConsume_3 = "tab_consume_3";
     88     channel->BasicConsume(strTopicQueue_1, strFlagConsume_1, true, false, true);
     89     channel->BasicConsume(strTopicQueue_2, strFlagConsume_2, true, false, true);
     90     channel->BasicConsume(strTopicQueue_3, strFlagConsume_3, true, false, true);
     91     // BasicConsume() 的第4 个参数为false 表示,我们需要主动ack 服务器才将该消息清除。
     92 
     93     std::vector<std::string> vecFlagConsume;
     94     vecFlagConsume.push_back(strFlagConsume_1);
     95     vecFlagConsume.push_back(strFlagConsume_2);
     96     vecFlagConsume.push_back(strFlagConsume_3);
     97 
     98     while (true)
     99     {
    100         AmqpClient::Envelope::ptr_t envelope = channel->BasicConsumeMessage(vecFlagConsume);
    101 
    102         std::string strExchange = envelope->Exchange();
    103         std::string strFlagConsume = envelope->ConsumerTag();
    104         std::string severity = envelope->RoutingKey();
    105         std::string buffer = envelope->Message()->body();
    106 
    107         std::cout << "[Y] exchange: " << strExchange << ", flagconsume: " << strflagConsume 
    108             << ", receive " << severity << ": " << buffer << std::endl;
    109 
    110         channel->BasicAck(envelope);
    111     }
    112 
    113     for (size_t i = 0; i < vecFlagConsume.size(); ++i)
    114         channel->BasicCancel(vecFlagConsume[i]);
    115 }
  • 相关阅读:
    Java实现 蓝桥杯VIP 基础练习 回形取数
    Java实现 蓝桥杯VIP 基础练习 回形取数
    Java实现 蓝桥杯VIP 基础练习 回形取数
    Java实现 蓝桥杯VIP 基础练习 回形取数
    Java实现 蓝桥杯VIP 基础练习 报时助手
    Java实现 蓝桥杯VIP 基础练习 报时助手
    Java实现 蓝桥杯VIP 基础练习 报时助手
    Java实现 蓝桥杯VIP 基础练习 报时助手
    Java实现 蓝桥杯VIP 基础练习 报时助手
    block的是发送信号的线程,又不是处理槽函数的线程
  • 原文地址:https://www.cnblogs.com/suyunhong/p/9257482.html
Copyright © 2011-2022 走看看