zoukankan      html  css  js  c++  java
  • rabbitmq AmqpClient 使用Topic 交换机同一个channel 同时多个队列 ,多个交换机,C++代码示例

      1 // 消息发送
      2 bool PublishExchangeTopicMulti(const std::string &strUri)
      3 {
      4     AmqpClient::Channel::ptr_t channel = 
      5         AmqpClient::Channel::CreateFromUri(strUri);
      6 
      7     if (channel == nullptr)
      8     {
      9         return false;
     10     }
     11 
     12     // 声明交换机,若不存在则创建 
     13     std::string strTopicExchange1 = "topic_exchange_1";
     14     std::string strTopicExchange2 = "topic_exchange_2";
     15     channel->DeclareExchange(strTopicExchange1, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC);
     16     channel->DeclareExchange(strTopicExchange2, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC);
     17 
     18     while (true)
     19     {
     20         // 可输入例如 "topic_exchange_1 disk.info 666"
     21         // "topic_exchange_2 any.warning 123"
     22         std::cout << "请输入[exchange] [routing_key1.routing_key2] [message]: " << std::endl;
     23     
     24         std::string strExchange;
     25         std::string severity;
     26         std::string message;
     27         std::cin >> strExchange;
     28         std::cin >> severity;
     29         std::cin >> message;
     30 
     31         channel->BasicPublish(strExchange, severity,
     32             AmqpClient::BasicMessage::Create(message));
     33 
     34         std::cout << "[X] to " << strExchange << ", send " 
     35             << severity << ": " << message << std::endl;
     36     }
     37 
     38 }
     39 
     40 
     41 void ReceiveTopicExchangeMulti(const std::string &strUri)
     42 {
     43     AmqpClient::Channel::ptr_t channel = 
     44         AmqpClient::Channel::CreateFromUri(strUri);
     45 
     46     if (channel == nullptr)
     47     {
     48         return ;
     49     }
     50 
     51     // 这里我们声明两个交换机,类型均为topic , 我们将通过不同的队列从两个交换机中取消息。
     52     // 这里交换机的名称需要与发送端的保持一致。
     53     std::string strTopicExchange1 = "topic_exchange_1";
     54     std::string strTopicExchange2 = "topic_exchange_2";
     55     channel->DeclareExchange(strTopicExchange1, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC);
     56     channel->DeclareExchange(strTopicExchange2, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC);
     57 
     58     // 这里我们声明了三个队列,第一个队列从交换机1 取数据第二三个队列从交换机2 取数据;
     59     // 但是第二三个队列所绑定的routing_key 有所不同。
     60     // 当然了,routing_key 也可以相同,这样的话相同routing_key 的消息就会在两个队列中都出现。
     61     std::string strTopicQueue_1 = "topic_queue_1";
     62     std::string strTopicQueue_2 = "topic_queue_2";
     63     std::string strTopicQueue_3 = "topic_queue_3";
     64     // 第一个参数若为空,则系统默认生成随机队列名称
     65     // 第三个参数 表明队列是否持久的。true: 服务器重启将会保留该Exchange,
     66     // 警告,若只设置此项,不代表消息的持久化。即不保证重启后消息还在。
     67     channel->DeclareQueue(strTopicQueue_1, false, true, false, false);
     68     channel->DeclareQueue(strTopicQueue_2, false, true, false, false);
     69     channel->DeclareQueue(strTopicQueue_3, false, true, false, false);
     70 
     71     // 队列绑定我们感兴趣的routing_key, 表示我们只接收这些routing_key 相关的消息。
     72     channel->BindQueue(strTopicQueue_1, strTopicExchange1, "*.info");
     73     channel->BindQueue(strTopicQueue_1, strTopicExchange1, "disk.*");
     74     channel->BindQueue(strTopicQueue_1, strTopicExchange1, "info.error");
     75 
     76     // 在交换机2 上面我们绑定了队列2 和队列3 。但是它们所关心的routing_key 不同。
     77     channel->BindQueue(strTopicQueue_3, strTopicExchange2, "*.info");
     78     channel->BindQueue(strTopicQueue_3, strTopicExchange2, "disk.*");
     79     channel->BindQueue(strTopicQueue_2, strTopicExchange2, "info.error");
     80 
     81     // 创建消费者标志,这个在后面会告诉 channel 我们需要哪些队列中的相关routing_key 的消息。
     82     // BasicConsume() 第五个参数是指该消息是否以独占的方式处理,若是则不允许第二个消费者绑定到该队列 上,
     83     // 若否,则多个消费者同时绑定到该队列,那么 在该队列上的消息将随机分配到某一个消费者。
     84     // 即,同一个消息将不会同时出现 在两个消费者身上。
     85     std::string strFlagConsume_1 = "tab_consume_1";
     86     std::string strFlagConsume_2 = "tab_consume_2";
     87     std::string strFlagConsume_3 = "tab_consume_3";
     88     channel->BasicConsume(strTopicQueue_1, strFlagConsume_1, true, false, true);
     89     channel->BasicConsume(strTopicQueue_2, strFlagConsume_2, true, false, true);
     90     channel->BasicConsume(strTopicQueue_3, strFlagConsume_3, true, false, true);
     91     // BasicConsume() 的第4 个参数为false 表示,我们需要主动ack 服务器才将该消息清除。
     92 
     93     std::vector<std::string> vecFlagConsume;
     94     vecFlagConsume.push_back(strFlagConsume_1);
     95     vecFlagConsume.push_back(strFlagConsume_2);
     96     vecFlagConsume.push_back(strFlagConsume_3);
     97 
     98     while (true)
     99     {
    100         AmqpClient::Envelope::ptr_t envelope = channel->BasicConsumeMessage(vecFlagConsume);
    101 
    102         std::string strExchange = envelope->Exchange();
    103         std::string strFlagConsume = envelope->ConsumerTag();
    104         std::string severity = envelope->RoutingKey();
    105         std::string buffer = envelope->Message()->body();
    106 
    107         std::cout << "[Y] exchange: " << strExchange << ", flagconsume: " << strflagConsume 
    108             << ", receive " << severity << ": " << buffer << std::endl;
    109 
    110         channel->BasicAck(envelope);
    111     }
    112 
    113     for (size_t i = 0; i < vecFlagConsume.size(); ++i)
    114         channel->BasicCancel(vecFlagConsume[i]);
    115 }
  • 相关阅读:
    How to use Log4j 2 with Spring Boot
    SpringBoot使用Redis缓存
    win10下安装redis
    mysql 8.0.15修改密码
    spring-boot集成redis实现缓存功能
    Spring Boot(三):Spring Boot 中 Redis 的使用
    关于 iView-cdn模式
    HTML5 Audio-使用 Media 事件添加进度栏
    vue-cli中自定义路径别名 assets和static文件夹的区别
    CH9001: 各浏览器对常用或者错误的 Content-Type 类型处理方式不一致
  • 原文地址:https://www.cnblogs.com/suyunhong/p/9257482.html
Copyright © 2011-2022 走看看