zoukankan      html  css  js  c++  java
  • 初识RabbitMQ

      好久没有更新技术博客了,今天正好有点时间,自学了一下RabbitMQ,现在就将我所学到的东西分享给大家,让你们也能同我一起进步,在程序员的技术道路上,一直前进下去。

      首先学习技术,我觉得还是能在官网去学习,这样才能学到最权威,最新的技术。毕竟是初学者,如果就一味的通过百度去学习技术,有可能会遇到不断的坑,只看别人写的博客,对于初学者来说,又不知对否,只能一味的接纳,最终会遇到不可预料的坑,都不知道如何下手,所以我建议大家能去官网学习,虽然说官网都是英文的,对于一些英语能力不好的同学,可能看的会有点头疼,但是只要自己坚持下去,一直看英文文档,遇到不懂的词,可以查一下,这样日积月累,达到从量变到质变的过程,最终再看什么英文文档,都不在话下了。好了,接下来,进入正题。

    一、安装RabbitMQ

    链接: https://pan.baidu.com/s/1zsNlA1IB0o05AEHNjijxGA 提取码: 2sph 

    可以在这里下载安装文档

    二、6种队列模式

    RabbitMQ的官网地址是http://www.rabbitmq.com,进入官网教程(RabbitMQ Tutorials),看到有6个模式:

    1、简单的队列模式

    这个图是官网上面的图,P是生产者,是发送消息的一方,C是消费者,是接收消息的一方,可以把RabbitMQ和邮局类比,当你想要寄信的时候,你会把信放在邮箱里,然后等待邮差把信收走,然后送到想要寄给的那个人,唯一不同点就是邮局是处理纸质信件,然而RabbitMQ是处理二进制数据。有三个术语,生产者(producer)是发送消息的一方,队列(queue)相当于邮箱,存储消息,消费者(consumer)是接收消息的一方,注意:生产者、消费者、队列可以不再同一台机器上,可以分布在不同的机器,一个应用既可以是生产者,也可以是消费者。

    接下来开始用代码实现简单队列模式:

    使用maven项目完成这个例子,先在pom.xml文件里添加rabbitmq的依赖

    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.4.4</version>
    </dependency>

    ①建一个工具类RabbitMQConnection,方便连接RabbitMQ
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    /**
    * 连接rabbitmq
    */
    public class RabbitMQConnection {

    private static Connection connection;

    static {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    try{
    /**
    * rabbitmq服务器
    */
    connectionFactory.setHost("127.0.0.1");
    /**
    * 虚拟主机名
    */
    connectionFactory.setVirtualHost("/testRabbitMQ");
    /**
    * 用户名
    */
    connectionFactory.setUsername("renruibin");
    /**
    * 密码
    */
    connectionFactory.setPassword("111111");
    /**
    * 默认端口
    */
    connectionFactory.setPort(5672);
    connection = connectionFactory.newConnection();
    } catch (Exception e){
    e.printStackTrace();
    }
    }

    /**
    * 获取连接
    * @return
    */
    public static Connection getConnection(){
    return connection;
    }
    }
    ②建立生产者,发送消息
    /**
    * 生产者
    */
    public class SendMessage {

    /**
    * 定义队列名称
    */
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:队列名称
    * 第二个参数:是否持久化
    * 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
    * 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
    */
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    /**
    * 4、发送消息
    * 第一个参数:交换机名字
    * 第二个参数:路由key,简单队列模式里,队列的名字就是路由key
    * 第三个参数:可以添加额外的配置
    * 第四个参数:二进制的消息数据
    */
    String message = "第"+new Random().nextInt()+"条消息";
    channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
    System.out.println("发送"+message+"成功");
    }
    }
    执行这段程序之后,会发送一条消息到RabbitMQ服务器的队列里,可以访问localhost:15672查看
    目前有一个连接

    目前有一个管道

    目前有一个队列,而且有一条消息等待被消费

    ③建立消费者,去消费这条消息

    public class ReceiveMessage {

    /**
    * 定义队列名称
    */
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:队列名称
    * 第二个参数:是否持久化
    * 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
    * 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
    */
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    /**
    * 4、定义队列的消费者
    */
    QueueingConsumer consumer = new QueueingConsumer(channel);
    /**
    * 5、监听队列,设置成自动确认接收成功
    * 第一个参数:队列名称
    * 第二个参数:是否自动ACK
    * 第三个参数:消费者
    */
    channel.basicConsume(QUEUE_NAME,true,consumer);
    /**
    * 6、开始消费数据
    */
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    System.out.println("收到"+new String(delivery.getBody())+"成功");
    }
    }
    执行这段程序之后,会消费simple_queue队列里的一条消息,看一下控制台的情况

    可以看到消息已经被消费了,simple_queue队列里的消息已经空了

    总结:简单队列模式就是生产者发送一条消息到队列里,消费者从队列里获取消息,不会经过交换机,也没有路由规则,这种模式是P2P的,也就是点对点,一个生产者只能对应一个消费者,不支持一个生产者对应多个消费者。

    2、工作模式

    顾名思义:工作模式的意思是这个模式跟工作一样,员工分工合作,每人干的活都是一样多的,没有竞争,就相当于多个消费者都是获取的消息是不一样的,所有消费者消费的消息之和为总的消息,与kafka的consumer group很类似,也是同一个组内的消费者获取的消息互斥,之和为总的消息。

    ①建立生产者,去生产一批消息

    /**
    * 生产者
    */
    public class SendMessage {

    /**
    * 定义队列名称
    */
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:队列名称
    * 第二个参数:是否持久化
    * 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
    * 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
    */
    channel.queueDeclare(QUEUE_NAME,true,false,false,null);
    /**
    * 4、发送消息
    * 第一个参数:交换机名字
    * 第二个参数:路由key,简单队列模式里,队列的名字就是路由key
    * 第三个参数:可以添加额外的配置
    * 第四个参数:二进制的消息数据
    */
    for(int i=0;i<100;i++){
    String message = "第"+i+"条消息";
    channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
    System.out.println("发送"+message+"成功");
    }
    channel.close();
    connection.close();
    }
    }
    ②建立消费者1,睡眠10毫秒,模拟工作能力快
    public class ReceiveMessage1 {

    /**
    * 定义队列名称
    */
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:队列名称
    * 第二个参数:是否持久化
    * 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
    * 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
    */
    channel.queueDeclare(QUEUE_NAME,true,false,false,null);
    /**
    * 4、定义队列的消费者
    */
    QueueingConsumer consumer = new QueueingConsumer(channel);
    /**
    * 5、监听队列,设置成自动确认接收成功
    * 第一个参数:队列名称
    * 第二个参数:是否自动ACK
    * 第三个参数:消费者
    */
    channel.basicConsume(QUEUE_NAME,false,consumer);
    /**
    * 避免不公平,实现多劳多得
    * 意思是每次获取一条消息,确认以后,才可以再次获取消息
    */
    channel.basicQos(1);
    /**
    * 6、开始消费数据
    */
    while (true){
    Thread.sleep(10);
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    System.out.println("消费者1收到"+new String(delivery.getBody())+"成功");
    /**
    * 7、确认消息已被消费
    * 第一个参数:deliveryTag
    * 第二个参数:multiple
    */
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
    }
    }
    ③建立消费者2,睡眠1000毫秒,模拟工作能力差
    public class ReceiveMessage2 {

    /**
    * 定义队列名称
    */
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:队列名称
    * 第二个参数:是否持久化
    * 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
    * 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
    */
    channel.queueDeclare(QUEUE_NAME,true,false,false,null);
    /**
    * 4、定义队列的消费者
    */
    QueueingConsumer consumer = new QueueingConsumer(channel);
    /**
    * 5、监听队列,设置成自动确认接收成功
    * 第一个参数:队列名称
    * 第二个参数:是否自动ACK
    * 第三个参数:消费者
    */
    channel.basicConsume(QUEUE_NAME,false,consumer);
    /**
    * 避免不公平,实现多劳多得
    * 意思是每次获取一条消息,确认以后,才可以再次获取消息
    */
    channel.basicQos(1);
    /**
    * 6、开始消费数据
    */
    while (true){
    Thread.sleep(1000);
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    System.out.println("消费者2收到"+new String(delivery.getBody())+"成功");
    /**
    * 7、确认消息已被消费
    * 第一个参数:deliveryTag
    * 第二个参数:multiple
    */
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
    }
    }
    3、发布订阅模式

    此种模式是生产者把消息发送到交换机(X)上,不用管是发送那哪个队列上了,消费者得把自己消费的那个队列与生产者的交换机进行绑定,通过一定的路由规则,才能把消息获取到。

    ①建立生产者,发送消息

    /**
    * 生产者(发布订阅模式)
    */
    public class SendMessage {

    /**
    * 定义交换机名称
    */
    private static final String EXCHANGE_NAME = "ps_exchange";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明交换机,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:交换机名称
    * 第二个参数:交换机类型 fanout direct headers topic
    */
    channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    /**
    * 4、发送消息
    * 第一个参数:交换机名字
    * 第二个参数:路由key,简单队列模式里,队列的名字就是路由key
    * 第三个参数:可以添加额外的配置
    * 第四个参数:二进制的消息数据
    */
    for(int i=0;i<100;i++){
    String message = "第"+i+"条消息";
    channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
    System.out.println("发送"+message+"成功");
    }
    channel.close();
    connection.close();
    }
    }
    执行这段程序以后,会发送100条消息,但是该交换机没有任何队列绑定,所以消息是会丢失的
    ②建立消费者1
    public class ReceiveMessage1 {

    /**
    * 定义交换机名称
    */
    private static final String EXCHANGE_NAME = "ps_exchange";
    /**
    * 定义队列名称
    */
    private static final String QUEUE_NAME = "ps_queue_1";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:队列名称
    * 第二个参数:是否持久化
    * 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
    * 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
    */
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    /**
    * 4、队列和交换机绑定
    * 第一个参数:队列名称
    * 第二个参数:交换机名称
    * 第三个参数:路由key
    */
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
    /**
    * 4、定义队列的消费者
    */
    QueueingConsumer consumer = new QueueingConsumer(channel);
    /**
    * 5、监听队列,设置成自动确认接收成功
    * 第一个参数:队列名称
    * 第二个参数:是否自动ACK
    * 第三个参数:消费者
    */
    channel.basicConsume(QUEUE_NAME,false,consumer);
    /**
    * 避免不公平,实现多劳多得
    * 意思是每次获取一条消息,确认以后,才可以再次获取消息
    */
    channel.basicQos(1);
    /**
    * 6、开始消费数据
    */
    while (true){
    Thread.sleep(10);
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    System.out.println("消费者1收到"+new String(delivery.getBody())+"成功");
    /**
    * 7、确认消息已被消费
    * 第一个参数:deliveryTag
    * 第二个参数:multiple
    */
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
    }
    }
    ③建立消费者2public class ReceiveMessage2 {

    /**
    * 定义交换机名称
    */
    private static final String EXCHANGE_NAME = "ps_exchange";
    /**
    * 定义队列名称
    */
    private static final String QUEUE_NAME = "ps_queue_2";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:队列名称
    * 第二个参数:是否持久化
    * 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
    * 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
    */
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    /**
    * 4、队列和交换机绑定
    * 第一个参数:队列名称
    * 第二个参数:交换机名称
    * 第三个参数:路由key
    */
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
    /**
    * 4、定义队列的消费者
    */
    QueueingConsumer consumer = new QueueingConsumer(channel);
    /**
    * 5、监听队列,设置成自动确认接收成功
    * 第一个参数:队列名称
    * 第二个参数:是否自动ACK
    * 第三个参数:消费者
    */
    channel.basicConsume(QUEUE_NAME,false,consumer);
    /**
    * 避免不公平,实现多劳多得
    * 意思是每次获取一条消息,确认以后,才可以再次获取消息
    */
    channel.basicQos(1);
    /**
    * 6、开始消费数据
    */
    while (true){
    Thread.sleep(10);
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    System.out.println("消费者2收到"+new String(delivery.getBody())+"成功");
    /**
    * 7、确认消息已被消费
    * 第一个参数:deliveryTag
    * 第二个参数:multiple
    */
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
    }
    }
    先把两个消费者启动,然后再启动一次生产者,可以看到两个消费者消费了同样的消息。消费者这块的代码与之前的不同点就是,增加了队列的绑定 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
    消费者1消费的消息:
                            

    消费者1收到第0条消息成功
    消费者1收到第1条消息成功
    消费者1收到第2条消息成功
    消费者1收到第3条消息成功
    消费者1收到第4条消息成功
    消费者1收到第5条消息成功
    消费者1收到第6条消息成功
    消费者1收到第7条消息成功
    消费者1收到第8条消息成功
    消费者1收到第9条消息成功
    消费者1收到第10条消息成功

      

     消费者2消费的消息:

    消费者2收到第0条消息成功
    消费者2收到第1条消息成功
    消费者2收到第2条消息成功
    消费者2收到第3条消息成功
    消费者2收到第4条消息成功
    消费者2收到第5条消息成功
    消费者2收到第6条消息成功
    消费者2收到第7条消息成功
    消费者2收到第8条消息成功
    消费者2收到第9条消息成功
    消费者2收到第10条消息成功

    可以在管控台看到多了一个交换机

    看到多了两个队列

    4、路由模式

    这种模式是路由模式,有一定的路由规则,只有符合这个路由规则的队列,才可以消费消息,可以达到消费者自主选择想要消费的消息,不用一味的获取生产者发送的所有消息。

    ①建立生产者,发送一条消息

    /**
    * 生产者(路由模式)
    */
    public class SendMessage {

    /**
    * 定义交换机名称
    */
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明交换机,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:交换机名称
    * 第二个参数:交换机类型 fanout direct headers topic
    */
    channel.exchangeDeclare(EXCHANGE_NAME,"direct");

    /**
    * 4、发送消息
    * 第一个参数:交换机名字
    * 第二个参数:路由key
    * 第三个参数:可以添加额外的配置
    * 第四个参数:二进制的消息数据
    */
    String message = "warn日志";
    channel.basicPublish(EXCHANGE_NAME,"warn",null,message.getBytes());
    System.out.println("发送"+message+"成功");
    channel.close();
    connection.close();
    }
    }
    执行完这段程序,会建立一个direct类型的交换机,并且指定的路由key是warn,在管控台可以看到多了一个交换机

    此时还没有队列绑定,所以发送的消息是会丢失的

    ②建立一个消费者1,消费error的消息

    public class ReceiveMessage1 {

    /**
    * 定义交换机名称
    */
    private static final String EXCHANGE_NAME = "direct_exchange";
    /**
    * 定义队列名称
    */
    private static final String QUEUE_NAME = "direct_queue_1";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:队列名称
    * 第二个参数:是否持久化
    * 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
    * 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
    */
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    /**
    * 4、队列和交换机绑定
    * 第一个参数:队列名称
    * 第二个参数:交换机名称
    * 第三个参数:路由key
    */
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
    /**
    * 4、定义队列的消费者
    */
    QueueingConsumer consumer = new QueueingConsumer(channel);
    /**
    * 5、监听队列,设置成自动确认接收成功
    * 第一个参数:队列名称
    * 第二个参数:是否自动ACK
    * 第三个参数:消费者
    */
    channel.basicConsume(QUEUE_NAME,false,consumer);
    /**
    * 避免不公平,实现多劳多得
    * 意思是每次获取一条消息,确认以后,才可以再次获取消息
    */
    channel.basicQos(1);
    while (true){
    /**
    * 6、开始消费数据
    */
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    System.out.println("消费者1收到"+new String(delivery.getBody())+"成功");
    /**
    * 7、确认消息已被消费
    * 第一个参数:deliveryTag
    * 第二个参数:multiple
    */
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
    }
    }
    ③建立消费者2,消费info和warn的消息
    public class ReceiveMessage2 {

    /**
    * 定义交换机名称
    */
    private static final String EXCHANGE_NAME = "direct_exchange";
    /**
    * 定义队列名称
    */
    private static final String QUEUE_NAME = "direct_queue_2";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:队列名称
    * 第二个参数:是否持久化
    * 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
    * 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
    */
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    /**
    * 4、队列和交换机绑定
    * 第一个参数:队列名称
    * 第二个参数:交换机名称
    * 第三个参数:路由key
    */
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warn");
    /**
    * 4、定义队列的消费者
    */
    QueueingConsumer consumer = new QueueingConsumer(channel);
    /**
    * 5、监听队列,设置成自动确认接收成功
    * 第一个参数:队列名称
    * 第二个参数:是否自动ACK
    * 第三个参数:消费者
    */
    channel.basicConsume(QUEUE_NAME,false,consumer);
    /**
    * 避免不公平,实现多劳多得
    * 意思是每次获取一条消息,确认以后,才可以再次获取消息
    */
    channel.basicQos(1);
    while (true){
    /**
    * 6、开始消费数据
    */
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    System.out.println("消费者2收到"+new String(delivery.getBody())+"成功");
    /**
    * 7、确认消息已被消费
    * 第一个参数:deliveryTag
    * 第二个参数:multiple
    */
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
    }
    }
    重新执行生产者,生产一条warn的消息
    生产者:发送warn日志成功
    消费者1:
    消费者2:消费者2收到warn日志成功
    重新执行生产者,生产一条info的消息
    生产者:发送info日志成功
    消费者1:
    消费者2:消费者2收到info日志成功

    重新执行生产者,生产一条error的消息
    生产者:发送error日志成功
    消费者1:消费者1收到error日志成功
    消费者2:
    5、主题模式

     此种模式是之前两种模式的一种升级,消费者可以很自由的获取到自己想要的消息

    • * (star) can substitute for exactly one word.      一个
    • # (hash) can substitute for zero or more words. 0个或多个

     

    如果把绑定key设为#,此种模式就相当于fanout,如果在绑定key里没有使用*和#,此种模式就相当于direct,所以topic模式非常的灵活

    ①建立生产者,发送消息

    /**
    * 生产者(路由模式)
    */
    public class SendMessage {

    /**
    * 定义交换机名称
    */
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明交换机,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:交换机名称
    * 第二个参数:交换机类型 fanout direct headers topic
    */
    channel.exchangeDeclare(EXCHANGE_NAME,"topic");

    /**
    * 4、发送消息
    * 第一个参数:交换机名字
    * 第二个参数:路由key
    * 第三个参数:可以添加额外的配置
    * 第四个参数:二进制的消息数据
    */
    String message = "quick.orange.rabbit";
    channel.basicPublish(EXCHANGE_NAME,"quick.orange.rabbit",null,message.getBytes());
    System.out.println("发送"+message+"成功");
    channel.close();
    connection.close();
    }
    }
    执行这段程序,会建立一个topic类型的交换机,在管控台可以看到多了一个交换机

    但是此时没有队列绑定,所以消息会丢失。

    ②建立消费者1,路由key为*.orange.*

    public class ReceiveMessage1 {

    /**
    * 定义交换机名称
    */
    private static final String EXCHANGE_NAME = "topic_exchange";
    /**
    * 定义队列名称
    */
    private static final String QUEUE_NAME = "topic_queue_1";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:队列名称
    * 第二个参数:是否持久化
    * 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
    * 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
    */
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    /**
    * 4、队列和交换机绑定
    * 第一个参数:队列名称
    * 第二个参数:交换机名称
    * 第三个参数:路由key
    */
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
    /**
    * 4、定义队列的消费者
    */
    QueueingConsumer consumer = new QueueingConsumer(channel);
    /**
    * 5、监听队列,设置成自动确认接收成功
    * 第一个参数:队列名称
    * 第二个参数:是否自动ACK
    * 第三个参数:消费者
    */
    channel.basicConsume(QUEUE_NAME,false,consumer);
    /**
    * 避免不公平,实现多劳多得
    * 意思是每次获取一条消息,确认以后,才可以再次获取消息
    */
    channel.basicQos(1);
    while (true){
    /**
    * 6、开始消费数据
    */
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    System.out.println("消费者1收到"+new String(delivery.getBody())+"成功");
    /**
    * 7、确认消息已被消费
    * 第一个参数:deliveryTag
    * 第二个参数:multiple
    */
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
    }
    }
    ③建立消费者2,路由key是*.*.rabbit,lazy.#
    public class ReceiveMessage2 {

    /**
    * 定义交换机名称
    */
    private static final String EXCHANGE_NAME = "topic_exchange";
    /**
    * 定义队列名称
    */
    private static final String QUEUE_NAME = "topic_queue_2";

    public static void main(String[] args) throws Exception{
    /**
    * 1、获取连接,相当于数据库连接
    */
    Connection connection = RabbitMQConnection.getConnection();
    /**
    * 2、创建通道,相当于statement
    */
    Channel channel = connection.createChannel();
    /**
    * 3、声明队列,如果没有,就创建(Declaring a queue is idempotent - it will only be created if it doesn't exist already. )
    * 第一个参数:队列名称
    * 第二个参数:是否持久化
    * 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
    * 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
    */
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    /**
    * 4、队列和交换机绑定
    * 第一个参数:队列名称
    * 第二个参数:交换机名称
    * 第三个参数:路由key
    */
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
    /**
    * 4、定义队列的消费者
    */
    QueueingConsumer consumer = new QueueingConsumer(channel);
    /**
    * 5、监听队列,设置成自动确认接收成功
    * 第一个参数:队列名称
    * 第二个参数:是否自动ACK
    * 第三个参数:消费者
    */
    channel.basicConsume(QUEUE_NAME,false,consumer);
    /**
    * 避免不公平,实现多劳多得
    * 意思是每次获取一条消息,确认以后,才可以再次获取消息
    */
    channel.basicQos(1);
    while (true){
    /**
    * 6、开始消费数据
    */
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    System.out.println("消费者1收到"+new String(delivery.getBody())+"成功");
    /**
    * 7、确认消息已被消费
    * 第一个参数:deliveryTag
    * 第二个参数:multiple
    */
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
    }
    }
    重新执行生产者发送消息,生产一条quick.orange.rabbit消息
    生产者:发送quick.orange.rabbit成功
    消费者1:消费者1收到quick.orange.rabbit成功
    消费者2:消费者2收到quick.orange.rabbit成功

    重新执行生产者发送消息,生产一条lazy.orange.elephant消息
    生产者:发送lazy.orange.elephant成功
    消费者1:消费者1收到lazy.orange.elephant成功
    消费者2:消费者2收到lazy.orange.elephant成功

    重新执行生产者发送消息,生产一条quick.orange.fox消息
    生产者:发送quick.orange.fox成功
    消费者1:消费者1收到quick.orange.fox成功
    消费者2:

    重新执行生产者发送消息,生产一条lazy.pink.rabbit消息
    生产者:发送lazy.pink.rabbit成功
    消费者1:
    消费者2:消费者2收到lazy.pink.rabbit成功

    重新执行生产者发送消息,生产一条quick.brown.fox消息
    生产者:发送quick.brown.fox成功
    消费者1:
    消费者2:
    6、RPC模式

    这种模式不常用,因为有专业的技术,去实现,所以就没有必要去学习这个模式了。过几天再研究一下RabbitMQ的集群模式,再来跟大家分享。

  • 相关阅读:
    wpf中显示HTML(转自http://steeven.cnblogs.com/archive/2006/06/12/424258.html)
    【msdn wpf forum翻译】TextBox中文本 中对齐 的方法
    【msdn wpf forum翻译】TextBlock等类型的默认样式(implicit style)为何有时不起作用?
    《Applications=Code+Markup》读书笔记 1(第一章 初识Application和Window)
    sql 分页
    Api
    快钱接口
    c#经典面试题
    static/const/readonly
    静态构造函数和静态类
  • 原文地址:https://www.cnblogs.com/rrb520/p/10138856.html
Copyright © 2011-2022 走看看