RabbitMQ 是一个在AMQP协议标准基础上完整的,可服用的企业消息系统。它遵循Mozilla Public License开源协议,采用 Erlang 实现的工业级的消息队列(MQ)服务器。
一、应用场景
异步处理
应用解耦
流量消峰
日志收集
二、支持多种语言
比如Js广告收集,大量的广告收集 会用到消息队列
消息中间件--》流实时计算
消息中间件--》离线计算
三、java操作rabbitmq
1 simple 简单队列
2 work queues 工作队列、公平分发、轮询分发
3 publish/subscribe 发布订阅
4 routing 路由选择 通配符模式
5 topics 主题
6 手动和自动确认消息
7 队列的持久化和非持久化
8 rabbitmq 的延迟队列 场景 :未支付订单30分钟取消
百度统计 cnzz架构
rabbitmq AMQP协议
添加用户
创建库 以/开头
virtual host
对用户授权
控制台 overview
Protocol | Node | Bound to | Port |
---|---|---|---|
amqp | rabbit1@mqnode1 | :: | 30007 |
amqp | rabbit2@mqnode2 | :: | 30007 |
amqp | rabbit3@mqnode3 | :: | 30007 |
clustering | rabbit1@mqnode1 | :: | 50001 |
clustering | rabbit2@mqnode2 | :: | 50001 |
clustering | rabbit3@mqnode3 | :: | 50001 |
http | rabbit1@mqnode1 | :: | 30008 |
http | rabbit2@mqnode2 | :: | 30008 |
http | rabbit3@mqnode3 | :: | 30008 |
amqp 开发所有接口,qmqp协议
clustering 集群端口
http http访问端口
connections 当前有哪些连接
chaannels 当前有那些频道
exchanges 交换机
队列
简单队列
下载安装
RabbitMQ
https://www.rabbitmq.com/install-rpm.html#downloads
https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.17
erlang
https://www.erlang-solutions.com/resources/download.html
教程
https://blog.csdn.net/u013887008/article/details/100859070
安装erlang
rpm -ivh esl-erlang_22.0.7-1~centos~7_amd64.rpm
如果报错如下:
警告:esl-erlang_22.0-1_centos_7_amd64.rpm: 头V4 RSA/SHA256 Signature, 密钥 ID a14f4fca: NOKEY
则需要先执行下面命令,安装依赖,在执行安装的命令:
-
sudo yum install epel-release
-
sudo yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl
3.执行下面命令进行验证:erl
4.执行如下命令退出:halt(). "注意这个.点,不能丢"
查看rpm 包安装到哪了
rpm -ql XXX.rpm
二、开始安装rabbitmq
pm -ivh --prefix= /opt/temp xxx.rpm
1.下载3.7.15版本的rabbitmq,与Erlang版本要对应上
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.15/rabbitmq-server-generic-unix-3.7.15.tar.xz
rpm -ivh rabbitmq-server-3.7.17-1.el7.noarch.rpm
## 可能需要先安装!!
yum install socat
启动rabbitmq
1.开机启动:
chkconfig rabbitmq-server on
2.查看启动状态:
rabbitmqctl status
3.启动,关闭,重启:
systemctl start rabbitmq-server.service
systemctl stop rabbitmq-server.service
systemctl restart rabbitmq-server.service
启动web管理台
rabbitmq-plugins enable rabbitmq_management
访问: http://192.168.93.129:15672
,默认用户:guest/guest
,但登陆时显示User can only log in via localhost!!!
解决方案
找到文件/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin/rabbit.app
:
将 {loopback_users, [<<"guest">>]},
改为{loopback_users, []},
然后重启服务即可:
systemctl restart rabbitmq-server.service
简单消息队列
java 调用mq,一对一 单个生产者,一个消息者
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.0.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> </dependencies>
生产者
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ProducterDirectDemo { private static String queneName = "testQuene"; public static Connection getConnection() { Connection connection = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.80.110"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); // 创建与RabbitMQ服务器的TCP连接 connection = factory.newConnection(); } catch (Exception ex) { ex.printStackTrace(); } return connection; } public static void main(String[] args) throws IOException, TimeoutException { Connection connection = getConnection(); Channel channel = null; try { // 创建一个频道 channel = connection.createChannel(); // 声明默认的队列 channel.queueDeclare(queneName, false, false, false, null); String msg = "发送消息"; channel.basicPublish("", queneName, null, msg.getBytes()); } catch (Exception ex) { ex.printStackTrace(); } finally { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } } }
消费者
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer { private final static String QUEUE_NAME = "testQuene"; //队列名称 public static void receive() throws IOException, TimeoutException { //由连接工厂创建连接 Connection connection = ProducterDirectDemo.getConnection(); //通过连接创建信道 Channel channel = connection.createChannel(); //创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替 DefaultConsumer consumer = new DefaultConsumer(channel){ //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写 @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { java.lang.String msg = new java.lang.String(body); System.out.println("received msg: " + msg); } }; //监听指定的queue。会一直监听。 //参数:要监听的queue、是否自动确认消息、使用的Consumer channel.basicConsume(QUEUE_NAME, true, consumer); } public static void main(String[] args) throws IOException, TimeoutException { receive(); } }
2、work queues工作队列
一个生产者多个消费者
多个客户端会均分,不管哪个处理的快哪个处理的慢
采用轮询的机制
采用公平分发,要关闭自动应答
3、autoAck 自动确认后从内存中删除
4、如果强制杀死正在执行的消费者就会丢失消息
如果不想丢失消息,要设置成手动
5、发布订阅模式
生产者把消息发送到交换机(exchange),交换机把消息发送到消息队列,一个队列对应一个消费者
路由规则
往交换机中发送消息
如果没有提前将队列绑定到交换机,那么直接运行生产者的话,消息是不会发到任何队列里的
6、topic 主题模式
将路由键和某模式匹配
# 匹配一个或多个
* 匹配一个
Good.#
7.rabbitmq 消息确认机制(事务)
生产有没有把消息发送到消息队列,默认是不知道的
两种方式
amqp 实现了事务机制
txSelect 用户将当前chanel 设置成transation
txCommit 提交事务
txRollback 回滚事务
降低了消息的吞吐量
confirm 模式
生产者端confirm 实现原理
confirm 异步模式
开启confirm 模式
发一条 waitforconfim 单条效率偏低
发一批 waitforconfim
异步模式