zoukankan      html  css  js  c++  java
  • RabbitMQ-从基础到实战(1)— Hello RabbitMQ

    转自:https://yq.aliyun.com/articles/589923

    1.简介

    本篇博文介绍了在windows平台下安装RabbitMQ Server端,并用JAVA代码实现收发消息

    2.安装RabbitMQ

    1. RabbitMQ是用Erlang开发的,所以需要先安装Erlang环境,在这里下载对应系统的Erlang安装包进行安装
    2. 点击这里下载对应平台的RabbitMQ安装包进行安装

    Windows平台安装完成后如图

    3.启用RabbitMQ Web控制台

    RabbitMQ提供一个控制台,用于管理和监控RabbitMQ,默认是不启动的,需要运行以下命令进行启动

    1. 点击上图的Rabbit Command Prompt,打开rabbitMQ控制台
    2. 官方介绍管理控制台的页面,可以看到,输入以下命令启动后台控制插件

      rabbitmq-plugins enable rabbitmq_management

    3. 登录后台页面:http://localhost:15672/   密码和用户名都是 guest ,界面如下

    目前可以先不用理会此界面,后面使用到时会详细介绍,也可以到这里查看官方文档。

    4.编写MessageSender

    Spring对RabbitMQ已经进行了封装,正常使用中,会使用Spring集成,第一个项目中,我们先不考虑那么多

    在IDE中新建一个Maven项目,并在pom.xml中贴入如下依赖,RabbitMQ的最新版本依赖可以在这里找到

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.1.0</version>
    </dependency>

    等待Maven下载完成后,就可以在Maven Dependencies中看到RabbitMQ的JAR

    在这里,我们发现,RabbitMQ的日志依赖了slf4j-api这个包,slf4j-api并不是一个日志实现,这样子是打不出日志的,所以,我们给pom加上一个日志实现,这里用了logback

    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.2.1</version>
    </dependency>

    之后maven依赖如下,可以放心写代码了

    新建一个MessageSender类,代码如下

    复制代码
    复制代码
     1 import java.io.IOException;
     2 import java.util.concurrent.TimeoutException;
     3 
     4 import org.slf4j.Logger;
     5 import org.slf4j.LoggerFactory;
     6 
     7 import com.rabbitmq.client.Channel;
     8 import com.rabbitmq.client.Connection;
     9 import com.rabbitmq.client.ConnectionFactory;
    10 
    11 public class MessageSender {
    12     
    13     private Logger logger = LoggerFactory.getLogger(MessageSender.class);
    14 
    15     //声明一个队列名字
    16     private final static String QUEUE_NAME = "hello";
    17     
    18     public boolean sendMessage(String message){
    19         //new一个RabbitMQ的连接工厂
    20         ConnectionFactory factory = new ConnectionFactory();
    21         //设置需要连接的RabbitMQ地址,这里指向本机
    22         factory.setHost("127.0.0.1");
    23         Connection connection = null;
    24         Channel channel = null;
    25         try {
    26             //尝试获取一个连接
    27             connection = factory.newConnection();
    28             //尝试创建一个channel
    29             channel = connection.createChannel();
    30             //这里的参数在后面详解
    31             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    32             //注意这里调用了getBytes(),发送的其实是byte数组,接收方收到消息后,需要重新组装成String
    33             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    34             logger.info("Sent '" + message + "'");
    35             //关闭channel和连接
    36             channel.close();
    37             connection.close();
    38         } catch (IOException | TimeoutException e) {
    39             //失败后记录日志,返回false,代表发送失败
    40             logger.error("send message faild!",e);
    41             return false;
    42         }
    43         return true;
    44     }
    45 }
    复制代码
    复制代码

    然后在App类的main方法中调用sendMessage

    1 public class App {
    2     public static void main( String[] args ){
    3         MessageSender sender = new MessageSender();
    4         sender.sendMessage("hello RabbitMQ!");
    5     }
    6 }

    打印日志如下

    打开RabbitMQ的控制台,可以看到消息已经进到了RabbitMQ中

    点进去,用控制台自带的getMessage功能,可以看到消息已经成功由RabbitMQ管理了

    至此,MessageSender已经写好了,在该类的31和33行,我们分别调用了队列声明和消息发送

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

    queueDeclare,有很多参数,我们可以看一下他的源码,注释上有详细的解释,我简单翻译了一下

    复制代码
    复制代码
     1 /**
     2      * Declare a queue 声明一个队列
     3      * @see com.rabbitmq.client.AMQP.Queue.Declare
     4      * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
     5      * @param queue the name of the queue队列的名字
     6      * @param durable true if we are declaring a durable queue (the queue will survive a server restart)是否持久化,为true则在rabbitMQ重启后生存
     7      * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)是否是排他性队列(别人看不到),只对当前连接有效,当前连接断开后,队列删除(设置了持久化也删除)
     8      * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)自动删除,在最后一个连接断开后删除队列
     9      * @param arguments other properties (construction arguments) for the queue 其他参数
    10      * @return a declaration-confirm method to indicate the queue was successfully declared
    11      * @throws java.io.IOException if an error is encountered
    12      */
    13     Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
    14                                  Map<String, Object> arguments) throws IOException;
    复制代码
    复制代码

    前面4个都非常好理解,最后一个“其他参数”,到底是什么其他参数,这个东西真的很难找,用到再解释吧,官方文档如下

    • TTL Time To Live  存活时间

    basicPublish的翻译如下

    复制代码
    复制代码
     1  /**
     2      * Publish a message.发送一条消息
     3      *
     4      * Publishing to a non-existent exchange will result in a channel-level
     5      * protocol exception, which closes the channel.
     6      *
     7      * Invocations of <code>Channel#basicPublish</code> will eventually block if a
     8      * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
     9      *
    10      * @see com.rabbitmq.client.AMQP.Basic.Publish
    11      * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
    12      * @param exchange the exchange to publish the message to 交换模式,会在后面讲,官方文档在这里
    13      * @param routingKey the routing key 控制消息发送到哪个队列
    14      * @param props other properties for the message - routing headers etc 其他参数
    15      * @param body the message body 消息,byte数组
    16      * @throws java.io.IOException if an error is encountered
    17      */
    18     void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
    复制代码
    复制代码

    这里又有个其他参数,它的类型是这样的,设置消息的一些详细属性

    5.编写MessageConsumer

    为了和Sender区分开,新建一个Maven项目MessageConsumer

    复制代码
    复制代码
     1 package com.liyang.ticktock.rabbitmq;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import org.slf4j.Logger;
     7 import org.slf4j.LoggerFactory;
     8 
     9 import com.rabbitmq.client.AMQP;
    10 import com.rabbitmq.client.Channel;
    11 import com.rabbitmq.client.Connection;
    12 import com.rabbitmq.client.ConnectionFactory;
    13 import com.rabbitmq.client.Consumer;
    14 import com.rabbitmq.client.DefaultConsumer;
    15 import com.rabbitmq.client.Envelope;
    16 
    17 public class MessageConsumer {
    18     
    19     private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
    20     
    21     public boolean consume(String queueName){
    22         //连接RabbitMQ
    23         ConnectionFactory factory = new ConnectionFactory();
    24         factory.setHost("127.0.0.1");
    25         Connection connection = null;
    26         Channel channel = null;
    27         try {
    28             connection = factory.newConnection();
    29             channel = connection.createChannel();
    30             //这里声明queue是为了取消息的时候,queue肯定会存在
    31             //注意,queueDeclare是幂等的,也就是说,消费者和生产者,不论谁先声明,都只会有一个queue
    32             channel.queueDeclare(queueName, false, false, false, null);
    33             
    34             //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
    35             Consumer consumer = new DefaultConsumer(channel){
    36                 @Override
    37                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
    38                           throws IOException {
    39                         String message = new String(body, "UTF-8");
    40                         logger.info("Received '" + message + "'");
    41                 }
    42             };
    43             //上面是声明消费者,这里用声明的消费者消费掉队列中的消息
    44             channel.basicConsume(queueName, true, consumer);
    45             
    46             //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
    47            
    48         } catch (IOException | TimeoutException e) {
    49             //失败后记录日志,返回false,代表消费失败
    50             logger.error("send message faild!",e);
    51             return false;
    52         }
    53         
    54         
    55         return true;
    56     }
    57 }
    复制代码
    复制代码

    然后在App的main方法中调用Cunsumer进行消费

    复制代码
    复制代码
     1 public class App 
     2 {
     3     //这个队列名字要和生产者中的名字一样,否则找不到队列
     4     private final static String QUEUE_NAME = "hello";
     5     
     6     public static void main( String[] args )
     7     {
     8         MessageConsumer consumer = new MessageConsumer();
     9         consumer.consume(QUEUE_NAME);
    10     }
    11 }
    复制代码
    复制代码

    结果如下,消费者一直在等待消息,每次有消息进来,就会立刻消费掉

    6.多个消费者同时消费一个队列

    改造一下Consumer

    在App中new多个消费者

    改造Sender,使它不停的往RabbitMQ中发送消息

    启动Sender

    启动Consumer,发现消息很平均的发给四个客户端,一人一个,谁也不插队

    如果我们把速度加快呢?把Sender的休息时间去掉,发现消费开始变得没有规律了,其实呢,它还是有规律的,这个是RabbitMQ的特性,称作“Round-robin dispatching”,消息会平均的发送给每一个消费者,可以看第一第二行,消息分别是56981和56985,相应的82、82、84都被分给了其他线程,只是在当前线程的时间片内,可以处理这么多任务,所以就一次打印出来了

  • 相关阅读:
    Android编程动态创建视图View的方法
    ImageView的属性android:scaleType,即ImageView.setScaleType(ImageView.ScaleType)
    SQL 2008 数据库迁移
    Response.Write具体介绍
    美国地名大全(美国城市名称英文、中文)
    MySQL 暂时文件夹
    任务管理器进程中多个chrome.exe的问题
    PNG文件格式具体解释
    秒杀多线程第四篇 一个经典的多线程同步问题
    Metropolis Hasting算法
  • 原文地址:https://www.cnblogs.com/sharpest/p/10425250.html
Copyright © 2011-2022 走看看