zoukankan      html  css  js  c++  java
  • RabbitMQ初学之二:直接发送消息到队列

    一. 背景

      总前提:队列无论是在生产者声明还是在消费者声明,只有声明了,才能在RabbitMQ的管理界面看到该队列

      生产者直接发送消息到队列,消费者直接消费队列中的消息,而不用指定exchange并绑定。这种需求下,分三种情况:① 生产者声明队列(指定队列名称),消费者不指定队列,而是直接消费生产者指定的队列;② 生产者声指定队列,但不声明队列,而是直接将消息发送到该队列,消费生声明该队列,并从该队列接收消息;③ 生产者声明队列并将消息发送到该队列,消费者也声明该队列,并从该队列消费消息,但是:生产者和消费者声明队列时指定的参数要一致,否则会报错。下面分别进行说明:

    1. 生产者声明队列(指定队列名称),消费者不指定队列,而是直接消费生产者指定的队列,但是此时,声明队列的一方要先运行,否则消费者连不上队列,要报错

      ① 生产者代码

     1 import java.io.IOException;
     2 import com.rabbitmq.client.Channel;
     3 import com.rabbitmq.client.Connection;
     4 import com.rabbitmq.client.ConnectionFactory;
     5  
     6 public class Producer {
     7     private final static String QUEUE_NAME = "QUEUE1";  
     8  
     9     public static void main(String[] args) throws IOException {  
    10         ConnectionFactory factory = new ConnectionFactory();  
    11         factory.setHost("localhost");
    12         factory.setUsername("guest");
    13         factory.setPassword("guest");
    14         factory.setPort(5672);
    15         Connection connection = factory.newConnection();  
    16         Channel channel = connection.createChannel();  
    17  
    18         // 声明队列
    19         channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
    20         String message = "Hello World!";  
    21         
    22         // 发行消息到队列
    23         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
    24         System.out.println(" [x] Sent '" + message + "'");  
    25  
    26         channel.close();  
    27         connection.close();  
    28     }  
    29 }

      2. 消费者

     1 import com.rabbitmq.client.ConnectionFactory;  
     2 import com.rabbitmq.client.QueueingConsumer;  
     3 import com.rabbitmq.client.Channel;  
     4 import com.rabbitmq.client.Connection;  
     5 
     6 public class Reqv {
     7     
     8     private final static String QUEUE_NAME = "QUEUE1";  
     9       
    10     public static void main(String[] argv) throws Exception {  
    11         
    12         ConnectionFactory factory = new ConnectionFactory();
    13         factory.setUsername("guest");
    14         factory.setPassword("guest");
    15         factory.setHost("localhost");
    16         factory.setPort(5672);  
    17         
    18         Connection connection = factory.newConnection();  
    19         Channel channel = connection.createChannel();  
    20         
    21         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    22         
    23         QueueingConsumer consumer = new QueueingConsumer(channel);  
    24         
    25         // 消费者不声明队列,直接从队列中消费
    26         channel.basicConsume(QUEUE_NAME, true, consumer);  
    27         while(true){  
    28             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
    29             String message = new String(delivery.getBody(),"UTF-8");  
    30             System.out.println(" 【[x] Received 】:" + message);  
    31         }  
    32     }  
    33 }

    2.  生产者声指定队列,但不声明队列,而是直接将消息发送到该队列,消费生声明该队列,并从该队列接收消息,生产者可先运行)(不报错),但是发的消息无效(被丢弃),只有声明队列的一方运行后,在管理界面才能看到该队列

      ① 生产者

     1 import java.io.IOException;
     2 import com.rabbitmq.client.Channel;
     3 import com.rabbitmq.client.Connection;
     4 import com.rabbitmq.client.ConnectionFactory;
     5  
     6 public class Producer {
     7     private final static String QUEUE_NAME = "QUEUE2";  
     8  
     9     public static void main(String[] args) throws IOException {  
    10         ConnectionFactory factory = new ConnectionFactory();  
    11         factory.setHost("localhost");
    12         factory.setUsername("guest");
    13         factory.setPassword("guest");
    14         factory.setPort(5672);
    15         Connection connection = factory.newConnection();  
    16         Channel channel = connection.createChannel();  
    17  
    18         String message = "Hello World!";  
    19         
    20         // 发行消息到队列
    21         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
    22         System.out.println(" [x] Sent '" + message + "'");  
    23  
    24         channel.close();  
    25         connection.close();  
    26     }  
    27 }

      ② 消费者

     1 import com.rabbitmq.client.ConnectionFactory;  
     2 import com.rabbitmq.client.QueueingConsumer;  
     3 import com.rabbitmq.client.Channel;  
     4 import com.rabbitmq.client.Connection;  
     5 
     6 public class Reqv {
     7     
     8     private final static String QUEUE_NAME = "QUEUE2";  
     9       
    10     public static void main(String[] argv) throws Exception {  
    11         
    12         ConnectionFactory factory = new ConnectionFactory();
    13         factory.setUsername("guest");
    14         factory.setPassword("guest");
    15         factory.setHost("localhost");
    16         factory.setPort(5672);  
    17         
    18         Connection connection = factory.newConnection();  
    19         Channel channel = connection.createChannel();  
    20         
    21         // 声明队列
    22         channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
    23         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    24         
    25         QueueingConsumer consumer = new QueueingConsumer(channel);  
    26         
    27         // 消费者不声明队列,直接从队列中消费
    28         channel.basicConsume(QUEUE_NAME, true, consumer);  
    29         while(true){  
    30             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
    31             String message = new String(delivery.getBody(),"UTF-8");  
    32             System.out.println(" 【[x] Received 】:" + message);  
    33         }  
    34     }  
    35 }

    3. 生产者声明队列并将消息发送到该队列,消费者也声明该队列,并从该队列消费消息,但是:生产者和消费者声明队列时指定的参数要一致,否则会报错。

      ① 生产者

     1 import java.io.IOException;
     2 import com.rabbitmq.client.Channel;
     3 import com.rabbitmq.client.Connection;
     4 import com.rabbitmq.client.ConnectionFactory;
     5  
     6 public class Producer {
     7     private final static String QUEUE_NAME = "QUEUE2";  
     8  
     9     public static void main(String[] args) throws IOException {  
    10         ConnectionFactory factory = new ConnectionFactory();  
    11         factory.setHost("localhost");
    12         factory.setUsername("guest");
    13         factory.setPassword("guest");
    14         factory.setPort(5672);
    15         Connection connection = factory.newConnection();  
    16         Channel channel = connection.createChannel();  
    17  
    18         // 声明队列
    19         channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
    20         String message = "Hello World!";  
    21         
    22         // 发行消息到队列
    23         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
    24         System.out.println(" [x] Sent '" + message + "'");  
    25  
    26         channel.close();  
    27         connection.close();  
    28     }  
    29 }

      ② 消费者

     1 import com.rabbitmq.client.ConnectionFactory;  
     2 import com.rabbitmq.client.QueueingConsumer;  
     3 import com.rabbitmq.client.Channel;  
     4 import com.rabbitmq.client.Connection;  
     5 
     6 public class Reqv {
     7     
     8     private final static String QUEUE_NAME = "QUEUE2";  
     9       
    10     public static void main(String[] argv) throws Exception {  
    11         
    12         ConnectionFactory factory = new ConnectionFactory();
    13         factory.setUsername("guest");
    14         factory.setPassword("guest");
    15         factory.setHost("localhost");
    16         factory.setPort(5672);  
    17         
    18         Connection connection = factory.newConnection();  
    19         Channel channel = connection.createChannel();  
    20         
    21         // 声明队列
    22         channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
    23         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    24         
    25         QueueingConsumer consumer = new QueueingConsumer(channel);  
    26         
    27         // 消费者不声明队列,直接从队列中消费
    28         channel.basicConsume(QUEUE_NAME, true, consumer);  
    29         while(true){  
    30             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
    31             String message = new String(delivery.getBody(),"UTF-8");  
    32             System.out.println(" 【[x] Received 】:" + message);  
    33         }  
    34     }  
    35 }
  • 相关阅读:
    zzbank oneOpencloud Env linuxaix6.1 interactiveMaintain(nfs,aix genintall基于系统iso光盘,aix6.1 puppet-Agent,Cent6.4 puppetServer,agent time no syn case Er)
    openStack core service Components Ins shell scripts and simple provision
    openStack deep dive,Retake Policy
    openStack开源云repo db local or on-line 实战部署之Ruiy王者归来
    Power Network (poj 1459 网络流)
    Eclipse.ini參数设置(Maven Integration for Eclipse JDK Warning)
    移动三大平台和三大开发模式对照分析
    Android动态设置字体颜色
    Windows 8提升普通管理员权限为超级管理员权限以及激活超级管理员Administrator
    CDN具体解释(篇一)
  • 原文地址:https://www.cnblogs.com/tjudzj/p/9250643.html
Copyright © 2011-2022 走看看