zoukankan      html  css  js  c++  java
  • RabbitMQ

    RabbitMQ - topic

    在publish/subscribe模式中使用fanout类型有个缺陷,就是不能选择性接收的消息。
    我们可以让consumer获得所有已发布的消息中指定的几个消息。


    在之前的例子中我们这样绑定exchange和队列:

    channel.queueBind(queueName, EXCHANGE_NAME, "");


    暂且不论该代码中绑定的exchange类型,这里空着的参数就是routing key。
    routing key的意义与exchange类型有关,比如使用fanout类型就会忽略掉routing key。


    而解决这一问题的就是direct类型。
    direct exchange并不复杂,只不过是producer和consumer双方的exchange对应时还需要对应routing key。

    以下代码中,同一个exchange和两个队列进行绑定,两个队列分别和不同的binding key绑定。
    (PS:当然,我们也可以将同一个routing key绑定给不同的队列也没有问题。)
    另外,SERVERITY变量是rounting数组,假设将日志通过exchange发送出去,consumer根据自己的需要获取不同级别的日志:

    复制代码
    final class ChannelFactory_{
        private final static ConnectionFactory connFactory = new ConnectionFactory();
     
        public final static String EXCHANGE_NAME = "direct_exchange";
        public final static String[] SEVERITY = {"info","warning","error"};
     
        static {
            Channel temp = getChannel();
            try {
                temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
     
        public static Channel getChannel(int channelNumber){
            try {
                Connection connection = connFactory.newConnection();
                return connection.createChannel(channelNumber);
            } catch (IOException e) {
                e.printStackTrace();
            }return null;
        }
     
        public static Channel getChannel(){
            try {
                Connection connection = connFactory.newConnection();
                return connection.createChannel();
            } catch (IOException e) {
                e.printStackTrace();
            }return null;
        }
     
        public static void  closeChannel(Channel channel) throws IOException {
            channel.close();
            channel.getConnection().close();
        }
     
    }
    复制代码

    确认定义:

    consumer只需要warning和error级别(routing)的日志消息:

    复制代码
    public static void main(String[] args) throws IOException, InterruptedException {
            Channel channel = ChannelFactory_.getChannel();
     
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"warning");
            channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"error");
     
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName,true,consumer);
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                String routingKey = delivery.getEnvelope().getRoutingKey();
     
                System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
            }
     
        }
    复制代码

    producer将所有级别的日志都发送出去:

    复制代码
    public static void main(String[] args) throws IOException {
            Channel channel = ChannelFactory_.getChannel();
            String content = "message "+new Date();
     
            for (int i = 0; i <ChannelFactory_.SEVERITY.length ; i++) {
                channel.basicPublish(EXCHANGE_NAME,ChannelFactory_.SEVERITY[i],null,content.getBytes());
            }
            ChannelFactory_.closeChannel(channel);
        }
    复制代码

    运行结果:

    direct exchange可以让我们有选择性地接受消息。
    但这样做仍然有缺陷。
    虽然我可以只要求error和warning级别的日志,但是我不能再进行细分。
    比如我只想要数据库相关的error和warning级别的日志。


    为了实现这一点,我们需要使用另一个exchange类型——Topic。
    exchange类型为topic时,routing key是一组用"."隔开的词,但仅限255bytes。
    比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"


    topic和direct的不同点还有在consumer中定义routing key时我们可以使用通配符,比如:
    符号'*':可以匹配某一个词。
    符号'#':可以匹配0~N个词。

    举个例子说明,假设我们用rounting key描述一个动物。
    格式为: <性格>.<颜色>.<种类>
    用符号'*',我想要得到桔***的动物,即:"*.orange.*"
    用符号'#',我想要得到懒散的动物,即:"lazy.#"
    如果使用过程中有人破坏了格式,即使rounting key为"lazy.orange.male.rabbit"也可以匹配"lazy.#"。


    稍微修改上面的代码,首先定义一个topic exchange。

    public  final static String EXCHANGE_NAME = "topic_exchange";
    temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);

    确认定义:

    发送sql相关的log:

    复制代码
    public static void main(String[] args) throws IOException {
            Channel channel = ChannelFactory_.getChannel();
            String content = "message #$#$#$#$#$#$";
     
            channel.basicPublish(EXCHANGE_NAME,"warning.sql.connection.close",null,content.getBytes());
            channel.basicPublish(EXCHANGE_NAME,"error.sql.syntax",null,content.getBytes());
     
            ChannelFactory_.closeChannel(channel);
        }
    复制代码

    consumer接收所有sql相关的warning和所有error:

    复制代码
    public static void main(String[] args) throws IOException, InterruptedException {
            Channel channel = ChannelFactory_.getChannel();
     
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"warning.sql.#");
            channel.queueBind(queueName, ChannelFactory_.EXCHANGE_NAME,"error.#");
     
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName,true,consumer);
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                String routingKey = delivery.getEnvelope().getRoutingKey();
     
                System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
            }
     
        }
    复制代码

    运行结果:

     
    分类: Frameworks
    标签: javarabbitmq
  • 相关阅读:
    [Learn AF3]第二章 App Framework 3.0的组件View——AF3的驱动引擎
    [Learn AF3]第一章 如何使用App Framework 3.0 构造应用程序
    [译]Intel App Framework 3.0的变化
    手机浏览器中屏蔽img的系统右键菜单context menu
    HTML5 touche vents drag to move & AF actionsheet by longTap
    HTML5 FileReader
    【转】Gulp入门基础教程
    【Intel AF 2.1 学习笔记三】
    【Intel AF 2.1 学习笔记二】AF中的页面——Panel
    【Intel AF 2.1 学习笔记一】AF程序结构
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/4103631.html
Copyright © 2011-2022 走看看