RabbitMQ是目前非常热门的一款消息中间件,凭借其高可靠,易扩展,高可用及丰富的功能特性受到越来越多企业的青睐。
RabbitMQ时采用Erlang语言实现AMQP的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。
RabbitMQ具体特点如下:
- 可靠性:RabbitMQ使用一些机制来保证可靠性,如持久化,传输确认及发布确认等。
- 灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ已经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
- 扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
- 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
- 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP,MQTT等多种消息中间件协议。
- 多语言客户端:RabbitMQ几乎支持所有常用语言,比如Java,Python,Ruby,PHP,C#,JavaScript等。
- 管理界面:RabbitMQ提供了一易用的用户界面,使得用户可以监控和管理消息,集群中的节点等。
- 插件机制:RabbitMQ提供了许多插件,以实现从多方面扩展,当然也可以编写自己的插件。
1.消息中间件
- 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如:只包含文本字符串,JSON等,也可以很复杂,如内嵌对象。
- 消息队列中间件(Message Queue Middleware,简称为MQ)是指利用高效可靠消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
- 消息队列中间件,也可以称为消息队列或者消息中间件。一般有两种模式:点对点(P2P)模式,发布/订阅(Pub/Sub)模式。点对点模式:基于队列,生产者发送消息到队列,消费者从队列中接受消息,队列的存在使得消息的异步传输称为可能。发布/订阅模式:定义了如何向一个内容节点(主题topic,可认为是消息传递中介)发布和订阅消息,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。
- 目前开源的消息中间件,比较主流的有:RabbitMQ,Kafka,ActiveMQ,RocketMQ等。面向消息的中间件(简称为MOM)提供了松散耦合的灵活方式集成应用程序的一种机制。它们提供了基于存储和转发的应用程序中间的异步数据发送,即应用程序彼此不直接通信,而是与作为中介的消息中间件通信。消息中间件提供了有保证的消息发送。应用程序开发人员无须了解远程过程调用(RPC)和网络通信协议的细节。
2.消息中间件的作用
在不同的应用场景下可以展现不同的作用。
- 解耦:在项目启动之初来预测会碰到什么需求是极其困难的。消息中间件在处理过程中间插入了一个隐含的,基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独立地扩展或修改两边的处理过程,只要确保它们遵守同样的皆苦约束即可。
- 冗余(存储):有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在把一个消息从消息中间件中删除之前,需要你的处理系统明确地指出改消息已经被处理完成,从而确保你的数据被安全地保存直到你使用完毕。
- 扩展性:因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。
- 削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用消息中间件能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。
- 可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。
- 顺序保证:在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。
- 缓冲:在任何重要的系统中,都会存在需要不同处理时间元素。消息中间件通过一个缓冲层来帮助任务高效率地执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。
- 异步通信:在很多时候应用不想也不需要立即处理消息。消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理。
3.RabbitMQ的安装及使用
3.1在安装RabbitMQ之前需要安装Erlang,建议使用较新版本Erlang,这样可以获得较多更新和改进。官网下载地址:https://www.erlang.org/downloads
[root@instance-5x tar.gz]# tar -zxvf otp_src_20.0.tar.gz -C /usr/local/src
[root@instance-5x tar.gz]# cd /usr/local/src/otp_src_20.0/
[root@instance-5x otp_src_20.0]# ./configure --prefix=/usr/local/erlang
[root@instance-5x otp_src_20.0]# make
[root@instance-5x otp_src_20.0]# make install
若:出现报错信息:No curses library functions found。需要安装ncurses。
[root@instance-5x otp_src_20.0]# yum install ncurses-devel
配置环境变量
[root@instance-5x otp_src_20.0]# vim /etc/profile
export ERLANG_HOME=/usr/local/erlang
PATH=$PATH:$JAVA_HOME/bin:$ERLANG_HOME/bin
[root@instance-5x otp_src_20.0]# . /etc/profile
输入erl命令来验证Erlang是否安装成功
[root@instance-5x otp_src_20.0]# erl
Erlang/OTP 20 [erts-9.0] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe] [kernel-poll:false]
Eshell V9.0 (abort with ^G)
1>
3.2RabbitMQ的安装
官网下载地址:https://www.rabbitmq.com/releases/rabbitmq-server/ 安装包解压到相应的目下即可。
[root@instance-5x tar.gz]# tar -xvf rabbitmq-server-generic-unix-3.6.10.tar /usr/local/
配置环境变量
export RABBITMQ_HOME=/usr/local/rabbitmq_server-3.6.10
PATH=$PATH:$JAVA_HOME/bin:$ERLANG_HOME/bin:$RABBITMQ_HOME/sbin
[root@instance-5x otp_src_20.0]# . /etc/profile
运行RabbitMQ服务
[root@instance-5x otp_src_20.0]# rabbitmq-server -detached
rabbitmq-server 命令后面添加一个“-detached” 参数是为了能够让RabbitMQ服务以守护进程的方式在后台运行,这样就不会因为当前Shell窗口的关闭而影响服务。
rabbitmqctl status 命令查看RabbitMQ是否正常启动:
[root@instance-5x tar.gz]# rabbitmqctl status
Status of node 'rabbit@instance-och69p5x'
[{pid,25344},
{running_applications,
[{rabbit,"RabbitMQ","3.6.10"},
{mnesia,"MNESIA CXC 138 12","4.15"},
{rabbit_common,
"Modules shared by rabbitmq-server and rabbitmq-erlang-client",
"3.6.10"},
{ranch,"Socket acceptor pool for TCP protocols.","1.3.0"},
{ssl,"Erlang/OTP SSL application","8.2"},
{public_key,"Public key infrastructure","1.4.1"},
{asn1,"The Erlang ASN1 compiler version 5.0","5.0"},
{xmerl,"XML parser","1.3.15"},
{os_mon,"CPO CXC 138 46","2.4.2"},
{syntax_tools,"Syntax tools","2.1.2"},
{crypto,"CRYPTO","4.0"},
{compiler,"ERTS CXC 138 10","7.1"},
{sasl,"SASL CXC 138 11","3.0.4"},
{stdlib,"ERTS CXC 138 10","3.4"},
{kernel,"ERTS CXC 138 10","5.3"}]},
{os,{unix,linux}},
{erlang_version,
"Erlang/OTP 20 [erts-9.0] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:64] [hipe] [kernel-poll:true]
"},
{memory,
[{total,51716936},
{connection_readers,0},
{connection_writers,0},
{connection_channels,0},
{connection_other,0},
{queue_procs,30584},
{queue_slave_procs,0},
{plugins,0},
{other_proc,18867000},
{mnesia,66104},
{metrics,184432},
{mgmt_db,0},
{msg_index,48984},
{other_ets,1810296},
{binary,448456},
{code,21385216},
{atom,891849},
{other_system,8165535}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,415639142},
{disk_free_limit,50000000},
{disk_free,32308707328},
{file_descriptors,
[{total_limit,65435},
{total_used,3},
{sockets_limit,58889},
{sockets_used,0}]},
{processes,[{limit,1048576},{used,156}]},
{run_queue,0},
{uptime,24578},
{kernel,{net_ticktime,60}}]
4.生产和消费消息
Java代码展示
maven依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.2.1</version>
</dependency>
默认情况下,访问RabbitMQ服务的用户名和密码都是“guest”,这个账户有限制,默认只能通过本地网络(如 localhost)访问,远程网络访问受限,所以在实现生产和消费消息之前,需要另外添加一个用户,并设置相应的访问权限。
添加新用户,用户名为“root”,密码为“root123”
[root@instance-5x tar.gz]# rabbitmqctl add_user root root123
[root@instance-5x tar.gz]# rabbitmqctl setpermissions -p / root ".*" ".*" ".*"
[root@instance-5x tar.gz]# rabbitmqctl set_user_tags root administrator
提供者客户端代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMqProducer {
private static String EXCHANGE_NAME = "exchage_demo";
private static String QUEUE_NAME = "queue_demo";
private static String ROUTING_KEY = "routingkey_demo";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//创建一个type="direct",持久化,非自动删除得交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
//创建一个持久化,非排他的,非自动删除队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//将交换机与队列通过路由键(其实是绑定键,只不过direct类型下绑定键(bindingkey)和路由键(routingkey)一致才可以到达)绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//发送一条持久化的消息
String message = "hello world d!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
//关闭资源
channel.close();
connection.close();
System.out.println("finished...");
}
}
消费者客户端代码:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class RabbitMqConsumer {
private static String QUEUE_NAME = "queue_demo";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建信道
final Channel channel = connection.createChannel();
//设置客户端最多接受未被ack的消息个数
channel.basicQos(64);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("recv message:" + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
//等待回调资源执行完毕后关掉资源
TimeUnit.SECONDS.sleep(5);
//关闭资源
channel.close();
connection.close();
System.out.println("finished...");
}
}
连接工具类代码:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtil {
private static ConnectionFactory factory;
static {
factory = new ConnectionFactory();
factory.setHost("a.x.y.z");
factory.setPort(5672);
factory.setUsername("username");
factory.setPassword("password");
}
public static Connection getConnection() throws IOException, TimeoutException {
return factory.newConnection();
}
}