zoukankan      html  css  js  c++  java
  • RabbitMQ学习03基本工作模式

    使用IDEA创建空的Maven项目。

    1、在POM文件中添加依赖:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>org.yas</groupId>
        <artifactId>RabbitMQDemo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <dependencies>
            <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.9.0</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/junit/junit -->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
            <!-- https://mvnrepository.com/artifact/log4j/log4j -->
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.14</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.21</version>
            </dependency>
        </dependencies>
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    </project>

    2、创建工具类,连接rabbitmq:

     1 package com.yas.config;
     2 
     3 import com.rabbitmq.client.Connection;
     4 import com.rabbitmq.client.ConnectionFactory;
     5 
     6 import java.io.IOException;
     7 import java.util.concurrent.TimeoutException;
     8 
     9 public class RabbitMQClient {
    10     public static Connection getConnection(){
    11         //创建Connection工厂
    12         ConnectionFactory factory = new ConnectionFactory();
    13         factory.setHost("localhost");
    14         factory.setPort(5672);
    15         factory.setUsername("username");
    16         factory.setPassword("password");
    17         factory.setVirtualHost("/");
    18 
    19         //创建Connection
    20         Connection connection = null;
    21         try {
    22             connection = factory.newConnection();
    23         } catch (IOException e) {
    24             e.printStackTrace();
    25         } catch (TimeoutException e) {
    26             e.printStackTrace();
    27         }
    28         return connection;
    29     }
    30 }

    3、测试连接rabbitmq的代码:

     1 package com.yas.test;
     2 
     3 import com.rabbitmq.client.Connection;
     4 import com.yas.config.RabbitMQClient;
     5 import org.junit.Test;
     6 
     7 import java.io.IOException;
     8 
     9 public class Demo1 {
    10     //测试连接rabbitmq
    11     @Test
    12     public void getConnection() throws IOException {
    13         Connection connection = RabbitMQClient.getConnection();
    14         connection.close();
    15     }
    16 }

    4、Hello World模式:

    模式介绍:生产者将消息发送到队列中,消费者从队列中接受消息。

    生产者:1个

    交换机:1个(默认)

    队列:1个

    消费者:1个

    生产者代码:

     1 package com.yas.helloworld;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import com.rabbitmq.client.Connection;
     5 import com.yas.config.RabbitMQClient;
     6 import org.junit.Test;
     7 public class Publisher {
     8     @Test
     9     public void publish() throws  Exception{
    10         //1.获取连接对象
    11         Connection connection = RabbitMQClient.getConnection();
    12         //2.创建Channel
    13         Channel channel = connection.createChannel();
    14         //3.发布消息到exchange,同时指定路由规则
    15         String msg = "Hello World";
    16         //参数1:指定exchange,使用空字符串,表示默认exchange。
    17         //参数2:指定路由规则,使用具体的队列名称。
    18         //参数3:指定传递的消息所携带的properties。
    19         //参数4:指定发布的具体消息,字节数组类型byte[]
    20         channel.basicPublish("","HelloWorld",null,msg.getBytes());
    21         //注意:exchange是不会将消息持久化到本地的,Queue有持久化的功能。
    22         System.out.println("生产者发布消息成功");
    23         //4.释放资源
    24         channel.close();
    25         connection.close();
    26     }
    27 }

    消费者代码:

     1 package com.yas.helloworld;
     2 
     3 import com.rabbitmq.client.*;
     4 import com.yas.config.RabbitMQClient;
     5 import org.junit.Test;
     6 
     7 import java.io.IOException;
     8 
     9 public class Consumer {
    10     @Test
    11     public void consume() throws  Exception{
    12         //1.获取连接对象
    13         Connection connection = RabbitMQClient.getConnection();
    14         //2.创建channel
    15         Channel channel = connection.createChannel();
    16         //3.生命队列-Hello World
    17         //参数1:queue,队列名称
    18         //参数2:durable,当前队列是否需要持久化
    19         //参数3:exclusive,是否排外
    20         //      影响1:当connection.close()时,当前队列会被自动删除
    21         //      影响2:当前队列只能被一个消费者消费
    22         //参数4:autoDelete,如果这个队列没有消费者消费,队列自动删除
    23         //参数5:arguments,指定当前队列的其他信息
    24         channel.queueDeclare("HelloWorld",true,false,false,null);
    25         //4.开启监听Queue
    26         DefaultConsumer consumer = new DefaultConsumer(channel){
    27             @Override
    28             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    29                 //super.handleDelivery(consumerTag, envelope, properties, body);
    30                 System.out.println("接受到消息:" + new String(body,"UTF-8"));
    31             }
    32         };
    33         //参数1:queue,指定消费哪个队列
    34         //参数2:deliverCallback,指定是否自动ACK,(true表示,接受到消息后,会立即通知RabbitMQ)
    35         //参数3:consumer,指定消费回调
    36         channel.basicConsume("HelloWorld",true,consumer);
    37         System.out.println("消费者开始监听队列");
    38         System.in.read();
    39         //5/释放资源
    40         channel.close();
    41         connection.close();
    42     }
    43 }

    5、Work Queue模式:

    模式介绍:生产者将消息发送到队列中,多个消费者轮流从队列中取出消息,互不重复。

    生产者:1个

    交换机:1个(默认)

    队列:1个

    消费者:2个

    生产者代码:(与Hello World模式一样)

     1 package com.yas.workqueue;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import com.rabbitmq.client.Connection;
     5 import com.yas.config.RabbitMQClient;
     6 import org.junit.Test;
     7 
     8 public class Publisher {
     9     @Test
    10     public void publish() throws  Exception{
    11         //1.获取连接对象
    12         Connection connection = RabbitMQClient.getConnection();
    13         //2.创建Channel
    14         Channel channel = connection.createChannel();
    15         //3.发布消息到exchange,同时指定路由规则
    16         for(int i =0;i<10;i++) {
    17             String msg = "Work Queue" + i;
    18         //参数1:指定exchange,使用空字符串,表示默认exchange。
    19         //参数2:指定路由规则,使用具体的队列名称。
    20         //参数3:指定传递的消息所携带的properties。
    21         //参数4:指定发布的具体消息,字节数组类型byte[]
    22 
    23             channel.basicPublish("", "WorkQueue", null, msg.getBytes());
    24         }
    25         //注意:exchange是不会将消息持久化到本地的,Queue有持久化的功能。
    26         System.out.println("生产者发布消息成功");
    27         //4.释放资源
    28         channel.close();
    29         connection.close();
    30     }
    31 }

    消费者1代码:

     1 package com.yas.workqueue;
     2 
     3 import com.rabbitmq.client.*;
     4 import com.yas.config.RabbitMQClient;
     5 import org.junit.Test;
     6 
     7 import java.io.IOException;
     8 
     9 public class Consumer1 {
    10     @Test
    11     public void consume() throws  Exception{
    12         //1.获取连接对象
    13         Connection connection = RabbitMQClient.getConnection();
    14         //2.创建channel
    15         Channel channel = connection.createChannel();
    16         //3.生命队列-Hello World
    17         //参数1:queue,队列名称
    18         //参数2:durable,当前队列是否需要持久化
    19         //参数3:exclusive,是否排外
    20         //      影响1:当connection.close()时,当前队列会被自动删除
    21         //      影响2:当前队列只能被一个消费者消费
    22         //参数4:autoDelete,如果这个队列没有消费者消费,队列自动删除
    23         //参数5:arguments,指定当前队列的其他信息
    24         channel.queueDeclare("WorkQueue",true,false,false,null);
    25 
    26         //3.5指定当前消费者,一次消费多少个消息
    27         channel.basicQos(1);;
    28 
    29         //4.开启监听Queue
    30         DefaultConsumer consumer = new DefaultConsumer(channel){
    31             @Override
    32             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    33                 //super.handleDelivery(consumerTag, envelope, properties, body);
    34                 try {
    35                     Thread.sleep(50);//性能好
    36                 } catch (InterruptedException e) {
    37                     e.printStackTrace();
    38                 }
    39                 System.out.println("消费者1接受到消息:" + new String(body,"UTF-8"));
    40 
    41                 //通知RabbitMQ已经消费完成,向rabbitmq发送ack消息
    42                 channel.basicAck(envelope.getDeliveryTag(),false);
    43             }
    44         };
    45         //参数1:queue,指定消费哪个队列
    46         //参数2:deliverCallback,指定是否自动ACK,(true表示,接受到消息后,会立即通知RabbitMQ)
    47         //参数3:consumer,指定消费回调
    48 //        channel.basicConsume("WorkQueue",true,consumer);
    49         channel.basicConsume("WorkQueue",false,consumer);//配合3.5节,手动ack通知
    50 
    51         System.out.println("消费者1开始监听队列");
    52         System.in.read();
    53         //5/释放资源
    54         channel.close();
    55         connection.close();
    56     }
    57 }

    消费者2代码:

     1 package com.yas.workqueue;
     2 
     3 import com.rabbitmq.client.*;
     4 import com.yas.config.RabbitMQClient;
     5 import org.junit.Test;
     6 
     7 import java.io.IOException;
     8 
     9 public class Consumer2 {
    10     @Test
    11     public void consume() throws  Exception{
    12         //1.获取连接对象
    13         Connection connection = RabbitMQClient.getConnection();
    14         //2.创建channel
    15         Channel channel = connection.createChannel();
    16         //3.生命队列-Hello World
    17         //参数1:queue,队列名称
    18         //参数2:durable,当前队列是否需要持久化
    19         //参数3:exclusive,是否排外
    20         //      影响1:当connection.close()时,当前队列会被自动删除
    21         //      影响2:当前队列只能被一个消费者消费
    22         //参数4:autoDelete,如果这个队列没有消费者消费,队列自动删除
    23         //参数5:arguments,指定当前队列的其他信息
    24         channel.queueDeclare("WorkQueue",true,false,false,null);
    25 
    26         //3.5指定当前消费者,一次消费多少个消息
    27         channel.basicQos(1);;
    28 
    29         //4.开启监听Queue
    30         DefaultConsumer consumer = new DefaultConsumer(channel){
    31             @Override
    32             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    33                 //super.handleDelivery(consumerTag, envelope, properties, body);
    34 
    35                 try {
    36                     Thread.sleep(200);//性能差
    37                 } catch (InterruptedException e) {
    38                     e.printStackTrace();
    39                 }
    40                 System.out.println("消费者2接受到消息:" + new String(body,"UTF-8"));
    41 
    42                 //通知RabbitMQ已经消费完成,可以消费下一个消息了
    43                 channel.basicAck(envelope.getDeliveryTag(),false);
    44             }
    45         };
    46         //参数1:queue,指定消费哪个队列
    47         //参数2:deliverCallback,指定是否自动ACK,(true表示,接受到消息后,会立即通知RabbitMQ)
    48         //参数3:consumer,指定消费回调
    49 //        channel.basicConsume("WorkQueue",true,consumer);
    50         channel.basicConsume("WorkQueue",false,consumer);//配合3.5节,手动ack通知
    51 
    52         System.out.println("消费者2开始监听队列");
    53         System.in.read();
    54         //5/释放资源
    55         channel.close();
    56         connection.close();
    57     }
    58 }

    6、发布订阅模式:

    模式介绍:生产者将消息发送到fanout交换机中,交换机会将信息发送给所有队列,消费者从它监听的队列接受全部的信息。每个消费者受到的信息都是一样的。

    生产者:1个

    交换机:1个(自定义fanout)

    队列:2个

    消费者:2个

    生产者代码:

     1 package com.yas.pubsub;
     2 
     3 import com.rabbitmq.client.BuiltinExchangeType;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 import com.yas.config.RabbitMQClient;
     7 import org.junit.Test;
     8 
     9 public class Publisher {
    10     @Test
    11     public void publish() throws  Exception{
    12         //1.获取连接对象
    13         Connection connection = RabbitMQClient.getConnection();
    14         //2.创建Channel
    15         Channel channel = connection.createChannel();
    16 
    17         //2.5创建exchange
    18         //参数1:exchange的名称
    19         //参数2:exchange的类型
    20         //      BuiltinExchangeType.FANOUT  对应pub-sub模式
    21         //      BuiltinExchangeType.DIRECT  对应routing模式
    22         //      BuiltinExchangeType.TOPIC   对应topics模式
    23         channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
    24 
    25         //绑定某一个队列
    26         //参数1:quque的名称
    27         //参数2:exchange的名称
    28         //参数3:路由规则,对于发布订阅模式,没有规则
    29         channel.queueBind("pubsub-queue1","pubsub-exchange","");
    30         channel.queueBind("pubsub-queue2","pubsub-exchange","");
    31 
    32         //3.发布消息到exchange,同时指定路由规则
    33         for(int i =0;i<10;i++) {
    34             String msg = "PUBSUB Queue" + i;
    35         //参数1:指定exchange,使用空字符串,表示默认exchange。
    36         //参数2:指定路由规则,使用具体的队列名称。
    37         //参数3:指定传递的消息所携带的properties。
    38         //参数4:指定发布的具体消息,字节数组类型byte[]
    39             channel.basicPublish("pubsub-exchange", "PUBSUBQueue", null, msg.getBytes());
    40         }
    41         //注意:exchange是不会将消息持久化到本地的,Queue有持久化的功能。
    42         System.out.println("生产者发布消息成功");
    43         //4.释放资源
    44         channel.close();
    45         connection.close();
    46     }
    47 }

    消费者1代码:

     1 package com.yas.pubsub;
     2 
     3 import com.rabbitmq.client.*;
     4 import com.yas.config.RabbitMQClient;
     5 import org.junit.Test;
     6 
     7 import java.io.IOException;
     8 
     9 public class Consumer1 {
    10     @Test
    11     public void consume() throws  Exception{
    12         //1.获取连接对象
    13         Connection connection = RabbitMQClient.getConnection();
    14         //2.创建channel
    15         Channel channel = connection.createChannel();
    16         //3.生命队列-Hello World
    17         //参数1:queue,队列名称
    18         //参数2:durable,当前队列是否需要持久化
    19         //参数3:exclusive,是否排外
    20         //      影响1:当connection.close()时,当前队列会被自动删除
    21         //      影响2:当前队列只能被一个消费者消费
    22         //参数4:autoDelete,如果这个队列没有消费者消费,队列自动删除
    23         //参数5:arguments,指定当前队列的其他信息
    24         channel.queueDeclare("pubsub-queue1",true,false,false,null);
    25 
    26         //3.5指定当前消费者,一次消费多少个消息
    27         channel.basicQos(1);;
    28 
    29         //4.开启监听Queue
    30         DefaultConsumer consumer = new DefaultConsumer(channel){
    31             @Override
    32             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    33                 //super.handleDelivery(consumerTag, envelope, properties, body);
    34                 try {
    35                     Thread.sleep(50);//性能好
    36                 } catch (InterruptedException e) {
    37                     e.printStackTrace();
    38                 }
    39                 System.out.println("消费者1接受到消息:" + new String(body,"UTF-8"));
    40 
    41                 //通知RabbitMQ已经消费完成,向rabbitmq发送ack消息
    42                 channel.basicAck(envelope.getDeliveryTag(),false);
    43             }
    44         };
    45         //参数1:queue,指定消费哪个队列
    46         //参数2:deliverCallback,指定是否自动ACK,(true表示,接受到消息后,会立即通知RabbitMQ)
    47         //参数3:consumer,指定消费回调
    48 //        channel.basicConsume("WorkQueue",true,consumer);
    49         channel.basicConsume("pubsub-queue1",false,consumer);//配合3.5节,手动ack通知
    50 
    51         System.out.println("消费者1开始监听队列");
    52         System.in.read();
    53         //5/释放资源
    54         channel.close();
    55         connection.close();
    56     }
    57 }

    消费者2代码:

     1 package com.yas.pubsub;
     2 
     3 import com.rabbitmq.client.*;
     4 import com.yas.config.RabbitMQClient;
     5 import org.junit.Test;
     6 
     7 import java.io.IOException;
     8 
     9 public class Consumer2 {
    10     @Test
    11     public void consume() throws  Exception{
    12         //1.获取连接对象
    13         Connection connection = RabbitMQClient.getConnection();
    14         //2.创建channel
    15         Channel channel = connection.createChannel();
    16         //3.生命队列-Hello World
    17         //参数1:queue,队列名称
    18         //参数2:durable,当前队列是否需要持久化
    19         //参数3:exclusive,是否排外
    20         //      影响1:当connection.close()时,当前队列会被自动删除
    21         //      影响2:当前队列只能被一个消费者消费
    22         //参数4:autoDelete,如果这个队列没有消费者消费,队列自动删除
    23         //参数5:arguments,指定当前队列的其他信息
    24         channel.queueDeclare("pubsub-queue2",true,false,false,null);
    25 
    26         //3.5指定当前消费者,一次消费多少个消息
    27         channel.basicQos(1);;
    28 
    29         //4.开启监听Queue
    30         DefaultConsumer consumer = new DefaultConsumer(channel){
    31             @Override
    32             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    33                 //super.handleDelivery(consumerTag, envelope, properties, body);
    34                 try {
    35                     Thread.sleep(50);//性能好
    36                 } catch (InterruptedException e) {
    37                     e.printStackTrace();
    38                 }
    39                 System.out.println("消费者1接受到消息:" + new String(body,"UTF-8"));
    40 
    41                 //通知RabbitMQ已经消费完成,向rabbitmq发送ack消息
    42                 channel.basicAck(envelope.getDeliveryTag(),false);
    43             }
    44         };
    45         //参数1:queue,指定消费哪个队列
    46         //参数2:deliverCallback,指定是否自动ACK,(true表示,接受到消息后,会立即通知RabbitMQ)
    47         //参数3:consumer,指定消费回调
    48 //        channel.basicConsume("WorkQueue",true,consumer);
    49         channel.basicConsume("pubsub-queue2",false,consumer);//配合3.5节,手动ack通知
    50 
    51         System.out.println("消费者1开始监听队列");
    52         System.in.read();
    53         //5/释放资源
    54         channel.close();
    55         connection.close();
    56     }
    57 }

    7、路由模式:

    模式介绍:生产者将信息发送到direct交换机中,每个消息带一个routingKey与队列相对应。消息只能发送到匹配的队列上。消费者监听某个队列,只能接收与队列routingKey匹配的消息。

    生产者:1个

    交换机:1个(自定义direct)

    队列:2个

    消费者:2个

    生产者代码:

     1 package com.yas.routing;
     2 
     3 import com.rabbitmq.client.BuiltinExchangeType;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 import com.yas.config.RabbitMQClient;
     7 import org.junit.Test;
     8 
     9 public class Publisher {
    10     @Test
    11     public void publish() throws Exception {
    12         //1.获取连接对象
    13         Connection connection = RabbitMQClient.getConnection();
    14         //2.创建Channel
    15         Channel channel = connection.createChannel();
    16 
    17         //2.5创建exchange
    18         //参数1:exchange的名称
    19         //参数2:exchange的类型
    20         //      BuiltinExchangeType.FANOUT  对应pub-sub模式
    21         //      BuiltinExchangeType.DIRECT  对应routing模式
    22         //      BuiltinExchangeType.TOPIC   对应topics模式
    23         channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
    24 
    25         //绑定某一个队列
    26         //参数1:quque的名称
    27         //参数2:exchange的名称
    28         //参数3:路由规则,对于发布订阅模式,没有规则
    29         channel.queueBind("routing-queue-error", "routing-exchange", "ERROR");
    30         channel.queueBind("routing-queue-info", "routing-exchange", "INFO");
    31 
    32         //3.发布消息到exchange,同时指定路由规则
    33         String RoutingKey = "";
    34         for (int i = 0; i < 10; i++) {
    35             if (i < 3) {
    36                 RoutingKey = "ERROR";
    37             } else {
    38                 RoutingKey = "INFO";
    39             }
    40             String msg = "Routing Queue" + i;
    41             //参数1:指定exchange,使用空字符串,表示默认exchange。
    42             //参数2:指定路由规则,使用具体的队列名称。
    43             //参数3:指定传递的消息所携带的properties。
    44             //参数4:指定发布的具体消息,字节数组类型byte[]
    45             channel.basicPublish("routing-exchange", RoutingKey, null, msg.getBytes());
    46         }
    47         //注意:exchange是不会将消息持久化到本地的,Queue有持久化的功能。
    48         System.out.println("生产者发布消息成功");
    49         //4.释放资源
    50         channel.close();
    51         connection.close();
    52     }
    53 }

    消费者1代码:

     1 package com.yas.routing;
     2 
     3 import com.rabbitmq.client.*;
     4 import com.yas.config.RabbitMQClient;
     5 import org.junit.Test;
     6 
     7 import java.io.IOException;
     8 
     9 public class Consumer1 {
    10     @Test
    11     public void consume() throws  Exception{
    12         //1.获取连接对象
    13         Connection connection = RabbitMQClient.getConnection();
    14         //2.创建channel
    15         Channel channel = connection.createChannel();
    16         //3.生命队列-Hello World
    17         //参数1:queue,队列名称
    18         //参数2:durable,当前队列是否需要持久化
    19         //参数3:exclusive,是否排外
    20         //      影响1:当connection.close()时,当前队列会被自动删除
    21         //      影响2:当前队列只能被一个消费者消费
    22         //参数4:autoDelete,如果这个队列没有消费者消费,队列自动删除
    23         //参数5:arguments,指定当前队列的其他信息
    24         channel.queueDeclare("routing-queue-error",true,false,false,null);
    25 
    26         //3.5指定当前消费者,一次消费多少个消息
    27         channel.basicQos(1);;
    28 
    29         //4.开启监听Queue
    30         DefaultConsumer consumer = new DefaultConsumer(channel){
    31             @Override
    32             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    33                 //super.handleDelivery(consumerTag, envelope, properties, body);
    34                 try {
    35                     Thread.sleep(50);//性能好
    36                 } catch (InterruptedException e) {
    37                     e.printStackTrace();
    38                 }
    39                 System.out.println("消费者error接受到消息:" + new String(body,"UTF-8"));
    40 
    41                 //通知RabbitMQ已经消费完成,向rabbitmq发送ack消息
    42                 channel.basicAck(envelope.getDeliveryTag(),false);
    43             }
    44         };
    45         //参数1:queue,指定消费哪个队列
    46         //参数2:deliverCallback,指定是否自动ACK,(true表示,接受到消息后,会立即通知RabbitMQ)
    47         //参数3:consumer,指定消费回调
    48 //        channel.basicConsume("WorkQueue",true,consumer);
    49         channel.basicConsume("routing-queue-error",false,consumer);//配合3.5节,手动ack通知
    50 
    51         System.out.println("消费者error开始监听队列");
    52         System.in.read();
    53         //5/释放资源
    54         channel.close();
    55         connection.close();
    56     }
    57 }

    消费者2代码:

     1 package com.yas.routing;
     2 
     3 import com.rabbitmq.client.*;
     4 import com.yas.config.RabbitMQClient;
     5 import org.junit.Test;
     6 
     7 import java.io.IOException;
     8 
     9 public class Consumer2 {
    10     @Test
    11     public void consume() throws  Exception{
    12         //1.获取连接对象
    13         Connection connection = RabbitMQClient.getConnection();
    14         //2.创建channel
    15         Channel channel = connection.createChannel();
    16         //3.生命队列-Hello World
    17         //参数1:queue,队列名称
    18         //参数2:durable,当前队列是否需要持久化
    19         //参数3:exclusive,是否排外
    20         //      影响1:当connection.close()时,当前队列会被自动删除
    21         //      影响2:当前队列只能被一个消费者消费
    22         //参数4:autoDelete,如果这个队列没有消费者消费,队列自动删除
    23         //参数5:arguments,指定当前队列的其他信息
    24         channel.queueDeclare("routing-queue-info",true,false,false,null);
    25 
    26         //3.5指定当前消费者,一次消费多少个消息
    27         channel.basicQos(1);;
    28 
    29         //4.开启监听Queue
    30         DefaultConsumer consumer = new DefaultConsumer(channel){
    31             @Override
    32             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    33                 //super.handleDelivery(consumerTag, envelope, properties, body);
    34                 try {
    35                     Thread.sleep(50);//性能好
    36                 } catch (InterruptedException e) {
    37                     e.printStackTrace();
    38                 }
    39                 System.out.println("消费者info接受到消息:" + new String(body,"UTF-8"));
    40 
    41                 //通知RabbitMQ已经消费完成,向rabbitmq发送ack消息
    42                 channel.basicAck(envelope.getDeliveryTag(),false);
    43             }
    44         };
    45         //参数1:queue,指定消费哪个队列
    46         //参数2:deliverCallback,指定是否自动ACK,(true表示,接受到消息后,会立即通知RabbitMQ)
    47         //参数3:consumer,指定消费回调
    48 //        channel.basicConsume("WorkQueue",true,consumer);
    49         channel.basicConsume("routing-queue-info",false,consumer);//配合3.5节,手动ack通知
    50 
    51         System.out.println("消费者info开始监听队列");
    52         System.in.read();
    53         //5/释放资源
    54         channel.close();
    55         connection.close();
    56     }
    57 }

    8、主题模式:

    模式介绍:生产者将信息发送到topic交换机中,每个队列有一个主题的模式,每条消息包含具体的主题,消息会发送都所有能匹配的队列上。消费者监听队列,接受消息。

    生产者:1个

    交换机:1个(自定义topic)

    队列:2个

    消费者:2个

    生产者代码:

     1 package com.yas.topic;
     2 
     3 import com.rabbitmq.client.BuiltinExchangeType;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 import com.yas.config.RabbitMQClient;
     7 import org.junit.Test;
     8 
     9 public class Publisher {
    10     @Test
    11     public void publish() throws Exception {
    12         //1.获取连接对象
    13         Connection connection = RabbitMQClient.getConnection();
    14         //2.创建Channel
    15         Channel channel = connection.createChannel();
    16 
    17         //2.5创建exchange
    18         //参数1:exchange的名称
    19         //参数2:exchange的类型
    20         //      BuiltinExchangeType.FANOUT  对应pub-sub模式
    21         //      BuiltinExchangeType.DIRECT  对应routing模式
    22         //      BuiltinExchangeType.TOPIC   对应topics模式
    23         channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
    24 
    25         //绑定某一个队列
    26         //参数1:quque的名称
    27         //参数2:exchange的名称
    28         //参数3:路由规则,
    29         //动物信息:<speed> <color> <what>
    30         // 动物1:*.red.*   -> *占位符,代表1个
    31         // 动物2:fast.#    -> # 通配符,代表多个
    32         // 动物3:*.*.rabbit
    33         channel.queueBind("topic-queue-1", "topic-exchange", "*.red.*");
    34         channel.queueBind("topic-queue-2", "topic-exchange", "fast.#");
    35         channel.queueBind("topic-queue-2", "topic-exchange", "*.*.rabbit");
    36         //3.发布消息到exchange,同时指定路由规则
    37         String RoutingKey = "";
    38         String msg = "";
    39         for (int i = 0; i < 10; i++) {
    40             if (i < 3) {
    41                 RoutingKey = "fast.red.monkey";//fast匹配队列2,red匹配队列1
    42                 msg = "快红猴";
    43             } else if (i < 7) {
    44                 RoutingKey = "slow.white.rabbit";//rabbit匹配队列3
    45                 msg = "慢白兔";
    46             } else {
    47                 RoutingKey = "fast.blue.cat";//fast匹配队列2
    48                 msg = "快蓝猫";
    49             }
    50 
    51             //参数1:指定exchange,使用空字符串,表示默认exchange。
    52             //参数2:指定路由规则,使用具体的队列名称。
    53             //参数3:指定传递的消息所携带的properties。
    54             //参数4:指定发布的具体消息,字节数组类型byte[]
    55             channel.basicPublish("topic-exchange", RoutingKey, null, msg.getBytes());
    56         }
    57         //注意:exchange是不会将消息持久化到本地的,Queue有持久化的功能。
    58         System.out.println("生产者发布消息成功");
    59         //4.释放资源
    60         channel.close();
    61         connection.close();
    62     }
    63 }

    消费者1代码:

     1 package com.yas.topic;
     2 
     3 import com.rabbitmq.client.*;
     4 import com.yas.config.RabbitMQClient;
     5 import org.junit.Test;
     6 
     7 import java.io.IOException;
     8 
     9 public class Consumer1 {
    10     @Test
    11     public void consume() throws  Exception{
    12         //1.获取连接对象
    13         Connection connection = RabbitMQClient.getConnection();
    14         //2.创建channel
    15         Channel channel = connection.createChannel();
    16         //3.生命队列-Hello World
    17         //参数1:queue,队列名称
    18         //参数2:durable,当前队列是否需要持久化
    19         //参数3:exclusive,是否排外
    20         //      影响1:当connection.close()时,当前队列会被自动删除
    21         //      影响2:当前队列只能被一个消费者消费
    22         //参数4:autoDelete,如果这个队列没有消费者消费,队列自动删除
    23         //参数5:arguments,指定当前队列的其他信息
    24         channel.queueDeclare("topic-queue-1",true,false,false,null);
    25 
    26         //3.5指定当前消费者,一次消费多少个消息
    27         channel.basicQos(1);;
    28 
    29         //4.开启监听Queue
    30         DefaultConsumer consumer = new DefaultConsumer(channel){
    31             @Override
    32             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    33                 //super.handleDelivery(consumerTag, envelope, properties, body);
    34                 try {
    35                     Thread.sleep(50);//性能好
    36                 } catch (InterruptedException e) {
    37                     e.printStackTrace();
    38                 }
    39                 System.out.println("消费者1接受到消息:" + new String(body,"UTF-8"));
    40 
    41                 //通知RabbitMQ已经消费完成,向rabbitmq发送ack消息
    42                 channel.basicAck(envelope.getDeliveryTag(),false);
    43             }
    44         };
    45         //参数1:queue,指定消费哪个队列
    46         //参数2:deliverCallback,指定是否自动ACK,(true表示,接受到消息后,会立即通知RabbitMQ)
    47         //参数3:consumer,指定消费回调
    48 //        channel.basicConsume("WorkQueue",true,consumer);
    49         channel.basicConsume("topic-queue-1",false,consumer);//配合3.5节,手动ack通知
    50 
    51         System.out.println("消费者1开始监听队列");
    52         System.in.read();
    53         //5/释放资源
    54         channel.close();
    55         connection.close();
    56     }
    57 }

    消费者2代码:

     1 package com.yas.topic;
     2 
     3 import com.rabbitmq.client.*;
     4 import com.yas.config.RabbitMQClient;
     5 import org.junit.Test;
     6 
     7 import java.io.IOException;
     8 
     9 public class Consumer2 {
    10     @Test
    11     public void consume() throws  Exception{
    12         //1.获取连接对象
    13         Connection connection = RabbitMQClient.getConnection();
    14         //2.创建channel
    15         Channel channel = connection.createChannel();
    16         //3.生命队列-Hello World
    17         //参数1:queue,队列名称
    18         //参数2:durable,当前队列是否需要持久化
    19         //参数3:exclusive,是否排外
    20         //      影响1:当connection.close()时,当前队列会被自动删除
    21         //      影响2:当前队列只能被一个消费者消费
    22         //参数4:autoDelete,如果这个队列没有消费者消费,队列自动删除
    23         //参数5:arguments,指定当前队列的其他信息
    24         channel.queueDeclare("topic-queue-2",true,false,false,null);
    25 
    26         //3.5指定当前消费者,一次消费多少个消息
    27         channel.basicQos(1);;
    28 
    29         //4.开启监听Queue
    30         DefaultConsumer consumer = new DefaultConsumer(channel){
    31             @Override
    32             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    33                 //super.handleDelivery(consumerTag, envelope, properties, body);
    34                 try {
    35                     Thread.sleep(50);//性能好
    36                 } catch (InterruptedException e) {
    37                     e.printStackTrace();
    38                 }
    39                 System.out.println("消费者2接受到消息:" + new String(body,"UTF-8"));
    40 
    41                 //通知RabbitMQ已经消费完成,向rabbitmq发送ack消息
    42                 channel.basicAck(envelope.getDeliveryTag(),false);
    43             }
    44         };
    45         //参数1:queue,指定消费哪个队列
    46         //参数2:deliverCallback,指定是否自动ACK,(true表示,接受到消息后,会立即通知RabbitMQ)
    47         //参数3:consumer,指定消费回调
    48 //        channel.basicConsume("WorkQueue",true,consumer);
    49         channel.basicConsume("topic-queue-2",false,consumer);//配合3.5节,手动ack通知
    50 
    51         System.out.println("消费者2开始监听队列");
    52         System.in.read();
    53         //5/释放资源
    54         channel.close();
    55         connection.close();
    56     }
    57 }
  • 相关阅读:
    吴裕雄--天生自然PHP-MySQL-JavaScript学习笔记:Cookies、会话和身份验证
    吴裕雄--天生自然PHP-MySQL-JavaScript学习笔记:表单处理
    吴裕雄--天生自然PHP-MySQL-JavaScript学习笔记:使用PHP访问MySQL
    吴裕雄--天生自然PHP-MySQL-JavaScript学习笔记:掌握MySQL
    吴裕雄--天生自然PHP-MySQL-JavaScript学习笔记:MySQL入门
    吴裕雄--天生自然PHP-MySQL-JavaScript学习笔记:实用PHP技术
    吴裕雄--天生自然PHP-MySQL-JavaScript学习笔记:PHP数组
    吴裕雄--天生自然PHP-MySQL-JavaScript学习笔记:PHP函数与对象
    (008)spring之BeanPostProcessor的执行时机及作用
    (007)spring容器获取ApplicationContext
  • 原文地址:https://www.cnblogs.com/asenyang/p/15500609.html
Copyright © 2011-2022 走看看