zoukankan      html  css  js  c++  java
  • fanout(Publish/Subscribe)发布/订阅

    引言

    它是一种通过广播方式发送消息的路由器,所有和exchange建立的绑定关系的队列都会接收到消息

    不处理路由键,只需要简单的将队列绑定到交换机上

    fanout交换机转发消息是最快的,它不需要做路由键的匹配

    1.模型

    2.Exchange

    Exchange在我们的工作模型中首次出现,因此需要详细介绍下。

    Exchange:接收消息,并根据路由键转发消息到所绑定的队列。

    Exchange分为4种类型:

    Direct:完全根据key进行投递的,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
    Topic:对key进行模式匹配后进行投递,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。
    Fanout:不需要key,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

    交换机属性:

    Durable:是否需要持久化,true为持久化

    AutoDelete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange

    Internal:当前Exchange是否用于Rabbitmq的内部使用,默认是false(用Erlang语言进行内部功能扩展会使用到)

    Arguments:扩展参数,用于扩展AMQP协议自定义使用

    今天我们的实例采用fanout类型的exchange。

    3.Queue-消息队列

    消息队列,实际存储消息数据
    Durability:是否持久化,Durable:true,Transient:false
    AutoDelete:true代表当最后一个监听被移除之后,该Queue会自动被删除。

    通过RabbitMQ 接口可以自己去生成临时队列,队列名字也由RabbitMQ自动生成。通过

    可以声明一个非持久的、通道独占的、自动删除的队列,getQueue()方法可以获取随机队列名字。这个名字用来在队列和exchange之间建立binding关系的时候使用:

    4.Message-消息

    服务器和应用程序之间传递的数据
    本质上就是一段数据,由Properties和Payload(Body)组成
    常用属性:
    delivery mode:投递模式,2表示持久化投递
    headers:自定义属性
    content_type:消息内容格式
    content_encoding:编码格式
    priority:优先级

      reply_to:一般用于RPC过程,消息处理返回结果队列;
      correlation_id:用于关联RPC的请求与应答;

    expiration:过期时间
    message_id:消息id

    5.创建生产者

    package com.dwz.rabbitmq.exchange.fanout;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_fanout_exchange";
            
            String msg = "hello rabbitmq fanout message successs!--";
            for(int i = 0; i < 50; i++) {
                channel.basicPublish(exchangeName, "", null, (msg + i).getBytes());
            }
            
            channel.close();
            connection.close();
        }
    }

    6.创建消费者1

    package com.dwz.rabbitmq.exchange.fanout;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            
            //交换机名称
            String exchangeName = "test_fanout_exchange";
            //交换机类型
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue_1";
            //路由键
            String routingKey = "";
            //交换机声明
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
            /*
             * durable:持久化,服务器重启,数据仍然存在
             * exclusive:独占一个channel,用于顺序消费,类似于加了一把锁
             * autoDelete:如果一个队列脱离了exchange会自动删除
             */
            //队列声明
            channel.queueDeclare(queueName, false, false, false, null);
            //队列绑定交换机
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("rec1--message:" + msg);
                }
            };
            //设置消费者
            channel.basicConsume(queueName, true, consumer);
        }
    }

    7.创建消费者2

    package com.dwz.rabbitmq.exchange.fanout;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class Consumer2 {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue_2";
            String routingKey = "";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
            /*
             * durable:持久化,服务器重启,数据仍然存在
             * exclusive:独占一个channel,用于顺序消费,类似于加了一把锁
             * autoDelete:如果一个队列脱离了exchange会自动删除
             */
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("rec2--message:" + msg);
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }

    8.运行代码

    先启动两个消费者,再启动生产者,运行结果:

    两个消费者都拿到exchange(交换机)的全部信息

    注:由于exchange只是做一个消息转发,本身是不具备存储数据的能力,如果先启动生产者,信息是到达不了队列的,队列有存储数据的功能

     参考文章:

    RabbitMQ学习笔记

  • 相关阅读:
    perfnet错误 事件ID:2004 无法打开服务器服务。服务器性能数据将不会被返回。
    尝试加载 Oracle 客户端库时引发 BadImageFormatException。如果在安装 32 位 Oracle 客户端组件的情况下以 64 位模式运行,将出现此问题
    oracle从dmp文件做数据恢复
    python预科5--函数及lambda匿名函数
    pytest 运行SyntaxError: invalid syntax
    java自动化--testNG集成extentreports(好坑,编辑的时候样式好的,但是发布了就这鬼样子还不能上图)
    git 命令
    java 5
    java JsonMapper
    java springboot mybatis整合
  • 原文地址:https://www.cnblogs.com/zheaven/p/11801106.html
Copyright © 2011-2022 走看看