zoukankan      html  css  js  c++  java
  • RabbitMQ学习笔记3-使用topic交换器

    topic的路由规则里使用【.】号分隔单词,使用【*】号匹配1个单词,使用【#】匹配多个.和多个*。

    在下面的例子中:

    logger.*可以匹配logger.error和logger.warning,但logger*.error只能匹配logger.error

    logger#可以匹配到logger.error和logger.warning。

    下面的例子使用topic接收警告、错误的日志,并根据匹配的路由规则发送给不同的Queue队列来处理的例子:

    日志生产者SenderWithTopicExchange

     1 package com.yzl.test2;
     2 
     3 import java.util.concurrent.CountDownLatch;
     4 import java.util.concurrent.ExecutorService;
     5 import java.util.concurrent.Executors;
     6 
     7 import com.rabbitmq.client.Channel;
     8 import com.rabbitmq.client.Connection;
     9 import com.rabbitmq.client.ConnectionFactory;
    10 
    11 /**
    12  * 使用topic交换器发送消息
    13  * 分为警告和错误2种日志
    14  * @author: yzl
    15  * @date: 2016-10-22
    16  */
    17 public class SenderWithTopicExchange {
    18     //交换器名称
    19     private static final String EXCHANGE_NAME = "myTopicExchange";
    20     //路由键的前缀
    21     private static final String BASE_ROUTING_KEY = "logger.";
    22     
    23     public static void main(String[] args) throws Exception {
    24         //使用CountDownLatch控制2个线程一起运行
    25         final CountDownLatch cdl = new CountDownLatch(2);
    26         //连接到rabbitmq服务器
    27         ConnectionFactory factory = new ConnectionFactory();
    28         factory.setHost("localhost");
    29         Connection connection = factory.newConnection();
    30         //创建一个信道
    31         final Channel channel = connection.createChannel();
    32         //定义一个名字为topicExchange的topic类型的exchange
    33         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    34         
    35         ExecutorService pool = Executors.newFixedThreadPool(2);
    36         pool.submit(new Runnable() {
    37             @Override
    38             public void run() {
    39                 try {
    40                     cdl.await();
    41                     //发送警告日志,绑定路由键:logger.warning
    42                     String warningMsg = "warning message is :";
    43                     for(int i=1; i<800; i++){
    44                         System.out.println("发送警告消息:" + warningMsg+i);
    45                         channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "warning", null, (warningMsg+i).getBytes());
    46                         Thread.sleep(2000L);
    47                     }
    48                 } catch (Exception e) {
    49                     e.printStackTrace();
    50                 }
    51             }
    52         });
    53         pool.submit(new Runnable() {
    54             @Override
    55             public void run() {
    56                 try {
    57                     cdl.await();
    58                     //发送错误日志,绑定路由键:logger.error
    59                     String errorMsg = "error message is :";
    60                     for(int i=1; i<1000; i++){
    61                         System.out.println("发送错误消息:" + errorMsg+i);
    62                         channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "error", null, (errorMsg+i).getBytes());
    63                         Thread.sleep(2000L);
    64                     }
    65                 } catch (Exception e) {
    66                     e.printStackTrace();
    67                 }
    68             }
    69         });
    70         
    71         cdl.countDown();
    72         cdl.countDown();
    73     }
    74 }

    消息消费者ReceiverWithTopicExchange

     1 package com.yzl.test2;
     2 
     3 import java.io.IOException;
     4 
     5 import com.rabbitmq.client.AMQP.BasicProperties;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 import com.rabbitmq.client.ConnectionFactory;
     9 import com.rabbitmq.client.DefaultConsumer;
    10 import com.rabbitmq.client.Envelope;
    11 
    12 /**
    13  * 使用topic交换器接收消息
    14  * 
    15  * @author: yzl
    16  * @date: 2016-10-22
    17  */
    18 public class ReceiverWithTopicExchange {
    19     // 交换器名称
    20     private static final String EXCHANGE_NAME = "myTopicExchange";
    21 
    22     public static void main(String[] args) throws Exception {
    23         // 连接到rabbitmq服务器
    24         ConnectionFactory factory = new ConnectionFactory();
    25         factory.setHost("localhost");
    26         Connection connection = factory.newConnection();
    27         // 创建一个信道
    28         final Channel channel = connection.createChannel();
    29         // 定义一个名字为topicExchange的topic类型的exchange
    30         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    31         
    32         //定义接收警告消息的队列
    33         channel.queueDeclare("warningQueue", false, false, false, null);
    34         //定义接收错误消息的队列
    35         channel.queueDeclare("errorQueue", false, false, false, null);
    36         //定义接收所有级别日志消息的队列
    37         channel.queueDeclare("allLoggerQueue", false, false, false, null);
    38         
    39         //使用logger.warning路由键绑定myTopicExchange与warningQueue
    40         channel.queueBind("warningQueue", EXCHANGE_NAME, "logger.warning");
    41         //使用logger.error路由键绑定myTopicExchange与errorQueue
    42         channel.queueBind("errorQueue", EXCHANGE_NAME, "logger.error");
    43         //使用logger.*路由规则绑定myTopicExchange与allLoggerQueue
    44         channel.queueBind("allLoggerQueue", EXCHANGE_NAME, "logger.*");
    45         
    46         //只处理警告日志,使用手动ack确认
    47         channel.basicConsume("warningQueue", false, new DefaultConsumer(channel){
    48             @Override
    49             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
    50                     throws IOException {
    51                 String msg = new String(body);
    52                 System.out.println("warningQueue accept a warning msg :" + msg);
    53                 channel.basicAck(envelope.getDeliveryTag(), false);
    54             }
    55         });
    56         //只处理错误日志,使用手动ack确认
    57         channel.basicConsume("errorQueue", false, new DefaultConsumer(channel){
    58             @Override
    59             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
    60                     throws IOException {
    61                 String msg = new String(body);
    62                 System.out.println("errorQueue accept a error msg :" + msg);
    63                 channel.basicAck(envelope.getDeliveryTag(), false);
    64             }
    65         });
    66         //处理警告和错误日志,使用手动ack确认
    67         channel.basicConsume("allLoggerQueue", false, new DefaultConsumer(channel){
    68             @Override
    69             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
    70                     throws IOException {
    71                 String msg = new String(body);
    72                 System.out.println("allLoggerQueue accept a logger msg :" + msg);
    73                 channel.basicAck(envelope.getDeliveryTag(), false);
    74             }
    75         });
    76     }
    77 }

    结果输出:

    发送警告消息:warning message is :1
    发送错误消息:error message is :1
    发送警告消息:warning message is :2
    发送错误消息:error message is :2
    发送错误消息:error message is :3
    发送警告消息:warning message is :3
    allLoggerQueue accept a logger msg :error message is :1
    allLoggerQueue accept a logger msg :warning message is :1
    errorQueue accept a error msg :error message is :1
    warningQueue accept a warning msg :warning message is :1
    warningQueue accept a warning msg :warning message is :2
    errorQueue accept a error msg :error message is :2
    allLoggerQueue accept a logger msg :warning message is :2
    allLoggerQueue accept a logger msg :error message is :2
    allLoggerQueue accept a logger msg :warning message is :3
    errorQueue accept a error msg :error message is :3
    warningQueue accept a warning msg :warning message is :3
    allLoggerQueue accept a logger msg :error message is :3

    消息处理流程:

  • 相关阅读:
    【VUE3.0体验】关于路由的一些坑
    TensorFlow中的卷积函数
    TensorFlow源码安装
    ubuntu远程桌面
    TensorFlow图像处理API
    C程序员眼里的Python
    深度剖析HashMap的数据存储实现原理(看完必懂篇)
    golang 互斥锁和读写锁
    golang goroutine的调度
    golang channel的使用以及调度原理
  • 原文地址:https://www.cnblogs.com/yangzhilong/p/5987566.html
Copyright © 2011-2022 走看看