zoukankan      html  css  js  c++  java
  • rabbitmq系列一 之简单队列

    1、 rabbitmq简介

      rabbitmq是一个消息代理,或者讲是一个消息中间件。主要是用来接收和转发信息的,它是对消息不做任何处理的。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。

      用来发送消息的程序我们称为生产者:

                                              

      用来存储信息的我们称为队列,队列只受到内存和磁盘的限制,是一个大的消息缓存区。生产者可以发送消息到队列中,而消费者从队列中接收消息。

                              

      等待接收消息的程序我们称为消费者:

                                   

       下面图中, “P”是生产者, “C”是消费者,中间的框是队列——代表是消息的缓冲区。

                      

      

      下面我们开始我们的简单程序hello world

            

    2 、sending(生产者)

                           

      生产者的代码如下:

     1 package rabbitmq.main;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 
     9 import rabbitmq.utils.ConnectionUtils;
    10 
    11 public class Send {
    12 
    13     private static  final String QUEUE_NAME = "rabbitmq_queue";
    14     public static void main(String[] args) throws IOException, TimeoutException {
    15         //获取一个连接
    16         Connection connection = ConnectionUtils.getConnection();
    17         //从连接中获取一个通道
    18         Channel channel = connection.createChannel();
    19         //创建队列
    20         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    21         String message = "hello world";
    22         //往队列里发送消息
    23         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    24         System.out.println(" [x] Sent '" + message + "'");
    25         //关闭通道
    26         channel.close();
    27         //关闭连接
    28         connection.close();
    29     }
    30 }
    View Code

     获取连接类的代码如下:

     1 package rabbitmq.utils;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 import com.rabbitmq.client.ConnectionFactory;
     9 
    10 public class ConnectionUtils {
    11     
    12     //返回一个连接类
    13     public static Connection getConnection() throws IOException, TimeoutException {
    14         //创建一个连接工厂
    15         ConnectionFactory factory = new ConnectionFactory();
    16         //设置服务地址
    17         factory.setHost("localhost");
    18         //设置用户
    19         factory.setUsername("guest");
    20         //密码
    21         factory.setPassword("guest");
    22         //设置端口 不设置默认是5672
    23         factory.setPort(5672);
    24         //创建一个新的连接    
    25         Connection connection = factory.newConnection();
    26         return connection;
    27     }
    28         
    29 }
    View Code

      执行代码后,在rabbitmq的界面会看到你创建的队列,并且里面有一条消息,如下

    3、 Receiving(消费者)

      消费者从队列中接收信息,消费者监听消息队列,一旦队列中有消息,队列将消息发送到消费者,消息就从队列中删除。

                              

      消费者代码如下:

     1 package rabbitmq.main;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import com.rabbitmq.client.AMQP;
     7 import com.rabbitmq.client.Channel;
     8 import com.rabbitmq.client.Connection;
     9 import com.rabbitmq.client.Consumer;
    10 import com.rabbitmq.client.DefaultConsumer;
    11 import com.rabbitmq.client.Envelope;
    12 import com.rabbitmq.client.AMQP.BasicProperties;
    13 
    14 import rabbitmq.utils.ConnectionUtils;
    15 
    16 public class Recv {
    17     private static final String QUEUE_NAME = "rabbitmq_queue";
    18 
    19     public static void main(String[] args) throws IOException, TimeoutException {
    20         //获取连接
    21         Connection connection = ConnectionUtils.getConnection();
    22         //创建 管道
    23         Channel channel = connection.createChannel();
    24         //创建声明队列(可有可无)
    25         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    26         Consumer consumer = new DefaultConsumer(channel) {
    27             @Override
    28             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
    29                     throws IOException {
    30                 String message = new String(body, "UTF-8");
    31                 System.out.println(" [x] Received '" + message + "'");
    32             }
    33         };
    34         //监听队列
    35         channel.basicConsume(QUEUE_NAME, true, consumer);
    36     }
    37     
    38 }
    View Code

      执行后,将收到一个消息并打印到控制台,消息队列中就没有消息了,如下:

  • 相关阅读:
    【网易官方】极客战记(codecombat)攻略-森林-村庄守卫village-warder
    【网易官方】极客战记(codecombat)攻略-森林-乡村漫游者village-rover
    【网易官方】极客战记(codecombat)攻略-森林-Agrippa 守卫战 B-the-agrippa-defense-b
    【网易官方】极客战记(codecombat)攻略-森林-Agrippa 守卫战A-the-agrippa-defense-a
    【网易官方】极客战记(codecombat)攻略-森林-Agrippa守卫战the-agrippa-defense
    【网易官方】极客战记(codecombat)攻略-森林-以静制动stillness-in-motion
    【网易官方】极客战记(codecombat)攻略-森林-跃火林中forest-fire-dancing
    Can not deserialize instance of xxx out of START_ARRAY token
    Springboot/cloud 项目突然出现许多Failed to read artifact descriptor, 或者无法解析
    redis-deskmanager 连不上 虚拟机
  • 原文地址:https://www.cnblogs.com/Hxinguan/p/9185013.html
Copyright © 2011-2022 走看看