zoukankan      html  css  js  c++  java
  • RabbitMQ初学之二:直接发送消息到队列

    一. 背景

      总前提:队列无论是在生产者声明还是在消费者声明,只有声明了,才能在RabbitMQ的管理界面看到该队列

      生产者直接发送消息到队列,消费者直接消费队列中的消息,而不用指定exchange并绑定。这种需求下,分三种情况:① 生产者声明队列(指定队列名称),消费者不指定队列,而是直接消费生产者指定的队列;② 生产者声指定队列,但不声明队列,而是直接将消息发送到该队列,消费生声明该队列,并从该队列接收消息;③ 生产者声明队列并将消息发送到该队列,消费者也声明该队列,并从该队列消费消息,但是:生产者和消费者声明队列时指定的参数要一致,否则会报错。下面分别进行说明:

    1. 生产者声明队列(指定队列名称),消费者不指定队列,而是直接消费生产者指定的队列,但是此时,声明队列的一方要先运行,否则消费者连不上队列,要报错

      ① 生产者代码

     1 import java.io.IOException;
     2 import com.rabbitmq.client.Channel;
     3 import com.rabbitmq.client.Connection;
     4 import com.rabbitmq.client.ConnectionFactory;
     5  
     6 public class Producer {
     7     private final static String QUEUE_NAME = "QUEUE1";  
     8  
     9     public static void main(String[] args) throws IOException {  
    10         ConnectionFactory factory = new ConnectionFactory();  
    11         factory.setHost("localhost");
    12         factory.setUsername("guest");
    13         factory.setPassword("guest");
    14         factory.setPort(5672);
    15         Connection connection = factory.newConnection();  
    16         Channel channel = connection.createChannel();  
    17  
    18         // 声明队列
    19         channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
    20         String message = "Hello World!";  
    21         
    22         // 发行消息到队列
    23         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
    24         System.out.println(" [x] Sent '" + message + "'");  
    25  
    26         channel.close();  
    27         connection.close();  
    28     }  
    29 }

      2. 消费者

     1 import com.rabbitmq.client.ConnectionFactory;  
     2 import com.rabbitmq.client.QueueingConsumer;  
     3 import com.rabbitmq.client.Channel;  
     4 import com.rabbitmq.client.Connection;  
     5 
     6 public class Reqv {
     7     
     8     private final static String QUEUE_NAME = "QUEUE1";  
     9       
    10     public static void main(String[] argv) throws Exception {  
    11         
    12         ConnectionFactory factory = new ConnectionFactory();
    13         factory.setUsername("guest");
    14         factory.setPassword("guest");
    15         factory.setHost("localhost");
    16         factory.setPort(5672);  
    17         
    18         Connection connection = factory.newConnection();  
    19         Channel channel = connection.createChannel();  
    20         
    21         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    22         
    23         QueueingConsumer consumer = new QueueingConsumer(channel);  
    24         
    25         // 消费者不声明队列,直接从队列中消费
    26         channel.basicConsume(QUEUE_NAME, true, consumer);  
    27         while(true){  
    28             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
    29             String message = new String(delivery.getBody(),"UTF-8");  
    30             System.out.println(" 【[x] Received 】:" + message);  
    31         }  
    32     }  
    33 }

    2.  生产者声指定队列,但不声明队列,而是直接将消息发送到该队列,消费生声明该队列,并从该队列接收消息,生产者可先运行)(不报错),但是发的消息无效(被丢弃),只有声明队列的一方运行后,在管理界面才能看到该队列

      ① 生产者

     1 import java.io.IOException;
     2 import com.rabbitmq.client.Channel;
     3 import com.rabbitmq.client.Connection;
     4 import com.rabbitmq.client.ConnectionFactory;
     5  
     6 public class Producer {
     7     private final static String QUEUE_NAME = "QUEUE2";  
     8  
     9     public static void main(String[] args) throws IOException {  
    10         ConnectionFactory factory = new ConnectionFactory();  
    11         factory.setHost("localhost");
    12         factory.setUsername("guest");
    13         factory.setPassword("guest");
    14         factory.setPort(5672);
    15         Connection connection = factory.newConnection();  
    16         Channel channel = connection.createChannel();  
    17  
    18         String message = "Hello World!";  
    19         
    20         // 发行消息到队列
    21         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
    22         System.out.println(" [x] Sent '" + message + "'");  
    23  
    24         channel.close();  
    25         connection.close();  
    26     }  
    27 }

      ② 消费者

     1 import com.rabbitmq.client.ConnectionFactory;  
     2 import com.rabbitmq.client.QueueingConsumer;  
     3 import com.rabbitmq.client.Channel;  
     4 import com.rabbitmq.client.Connection;  
     5 
     6 public class Reqv {
     7     
     8     private final static String QUEUE_NAME = "QUEUE2";  
     9       
    10     public static void main(String[] argv) throws Exception {  
    11         
    12         ConnectionFactory factory = new ConnectionFactory();
    13         factory.setUsername("guest");
    14         factory.setPassword("guest");
    15         factory.setHost("localhost");
    16         factory.setPort(5672);  
    17         
    18         Connection connection = factory.newConnection();  
    19         Channel channel = connection.createChannel();  
    20         
    21         // 声明队列
    22         channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
    23         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    24         
    25         QueueingConsumer consumer = new QueueingConsumer(channel);  
    26         
    27         // 消费者不声明队列,直接从队列中消费
    28         channel.basicConsume(QUEUE_NAME, true, consumer);  
    29         while(true){  
    30             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
    31             String message = new String(delivery.getBody(),"UTF-8");  
    32             System.out.println(" 【[x] Received 】:" + message);  
    33         }  
    34     }  
    35 }

    3. 生产者声明队列并将消息发送到该队列,消费者也声明该队列,并从该队列消费消息,但是:生产者和消费者声明队列时指定的参数要一致,否则会报错。

      ① 生产者

     1 import java.io.IOException;
     2 import com.rabbitmq.client.Channel;
     3 import com.rabbitmq.client.Connection;
     4 import com.rabbitmq.client.ConnectionFactory;
     5  
     6 public class Producer {
     7     private final static String QUEUE_NAME = "QUEUE2";  
     8  
     9     public static void main(String[] args) throws IOException {  
    10         ConnectionFactory factory = new ConnectionFactory();  
    11         factory.setHost("localhost");
    12         factory.setUsername("guest");
    13         factory.setPassword("guest");
    14         factory.setPort(5672);
    15         Connection connection = factory.newConnection();  
    16         Channel channel = connection.createChannel();  
    17  
    18         // 声明队列
    19         channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
    20         String message = "Hello World!";  
    21         
    22         // 发行消息到队列
    23         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
    24         System.out.println(" [x] Sent '" + message + "'");  
    25  
    26         channel.close();  
    27         connection.close();  
    28     }  
    29 }

      ② 消费者

     1 import com.rabbitmq.client.ConnectionFactory;  
     2 import com.rabbitmq.client.QueueingConsumer;  
     3 import com.rabbitmq.client.Channel;  
     4 import com.rabbitmq.client.Connection;  
     5 
     6 public class Reqv {
     7     
     8     private final static String QUEUE_NAME = "QUEUE2";  
     9       
    10     public static void main(String[] argv) throws Exception {  
    11         
    12         ConnectionFactory factory = new ConnectionFactory();
    13         factory.setUsername("guest");
    14         factory.setPassword("guest");
    15         factory.setHost("localhost");
    16         factory.setPort(5672);  
    17         
    18         Connection connection = factory.newConnection();  
    19         Channel channel = connection.createChannel();  
    20         
    21         // 声明队列
    22         channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
    23         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    24         
    25         QueueingConsumer consumer = new QueueingConsumer(channel);  
    26         
    27         // 消费者不声明队列,直接从队列中消费
    28         channel.basicConsume(QUEUE_NAME, true, consumer);  
    29         while(true){  
    30             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
    31             String message = new String(delivery.getBody(),"UTF-8");  
    32             System.out.println(" 【[x] Received 】:" + message);  
    33         }  
    34     }  
    35 }
  • 相关阅读:
    547. Friend Circles
    399. Evaluate Division
    684. Redundant Connection
    327. Count of Range Sum
    LeetCode 130 被围绕的区域
    LeetCode 696 计数二进制子串
    LeetCode 116 填充每个节点的下一个右侧节点
    LeetCode 101 对称二叉树
    LeetCode 111 二叉树最小深度
    LeetCode 59 螺旋矩阵II
  • 原文地址:https://www.cnblogs.com/tjudzj/p/9250643.html
Copyright © 2011-2022 走看看