学习地址
视频:https://www.bilibili.com/video/av18997807/?p=1
相关概述
1、消息队列解决了什么问题?
异步处理,应用解耦,流量削锋,日志处理
2、后台管理页面:http://localhost:15672/#/
如果页面不能访问,则需要在AppData目录下删除两个文件夹-见视频
账号&密码:guest&guest
3、virtual hosts 相当于mysql的数据库,授权哪些用户可以进入
=》一般以/开头
4、amqp就是与RabbitMQ通信的协议
简单队列
1、简单队列的不足?
1)耦合性高,生产者一一对应消费者(如果我想要多个消费者消费队列中的消息,这时候就不行了)
2)队列名变更,这时候得同时变更
工作队列
1、为什么会出现工作队列?
1)因为简单队列是一一对应的
2)我们实际开发时,生产者发送消息是毫不费力的,而消费者一般是要跟业务相结合的,消费者接收到消息之后就需要处理,可能需要花费事件,这时候就会积压了很多信息。
现象:(设置了两个消费者的响应时间一快一慢)
消费者1和消费者2处理的消息数量是一样的
消费者1:偶数
消费者2:奇数
这种方式叫做轮询分发(round-robin)
=》因为rabbitMQ不了解消费者的能力
公平分发
公平分发的意思就是能者多劳
使用公平分发,必须关闭自动应答,ack改成手动
只需要配置channel.basicQos(prefethCount)即可
消息应答与持久化
1、消息应答
boolean autoAck=true;(自动确认模式),一旦rabbitmq将消息分发给消费者,那么就会从内存中删除;(意思是不管消费者是否消费完了该条信息)
在这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息;boolean autoAck=false;(手动模式),如果有一个消费者挂掉,就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个消息应答,告诉rabbitmq这个消息我已经处理
成,你可以删了,然后rabbitmq就会删除内存中的消息;
消息应答默认是打开的,false
ack:Message acknowledgement
如果rabbitmq挂了,我们的消息仍然会丢失。所以需要进行持久化
2、持久化处理
boolean durable=false;
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
发布订阅模式
以上均为一个消息只能被一个消费者消费,所以想要解决这个问题,就得使用发布订阅模式
发布订阅模型
-----消息队列-----C1
P=>X----|
-----消息队列-----C2
解读:
1)1个生产者,多个消费者
2)每一个消费者都有自己的队列
3)生产者没有把消息发送到队列,而是发送到了交换机/转换器(Exchange)
4)每个队列到需要绑定到同一个交换机上
交换机:
一方面是接受生产者的消息,一方面是向队列推送消息
匿名转发 ""
=》channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
fanout(不处理路由键)
路由模式 diret
交换机和队列都有一个key进行匹配
见图
商品的所有操作路由到A队列,其他所有操作路由到B队列
如果要实现这个功能,那么就需要将商品所有的key列出来,这样子太麻烦了,所以需要引入topic模式
=>这里所指的全部列出来,是指消费者去消费的时候那个绑定方法
Topic模式
* 匹配一个
# 匹配一个或多个
这个*,我怎么匹配了多个
消息确认机制之事务机制
1、
recv中不用写自动应答等代码
在rabbitmq中,我们可以通过持久化数据去解决rabbitmq服务器异常的数据丢失问题;
问题:生产者将消息发送出去之后,消息到底有没有到达rabbitmq服务器,默认的情况是不知道的;
两种方式:
AMQP 实现了事务机制
Confirm模式
事务机制
txSelect 用户将当前channel设置成transation模式
txCommit
txRollback
我发现只要调用了txSelect()方法,其他commit或rollback都没写,也可以保证事务一致性,这条消息照样没发出去
这种模式每次都得开启事务,提交事务
所以这种模式是很耗时的,采用这种方式,降低了rabbitmq的消息吞吐量
2、
confirm单条
confirm批量
channel.confirmSelect();//开启confirm模式
confirm异步模式
Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Channel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集
删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构
备注:
1、rabbitmq-server-3.6.9.exe 激活插件时失败
rabbitmq-server-3.7.4.exe 安装该版本时提示ERLANG7.0版本过低
2、如果队列已经定义好了,那么rabbitmq不允许我们重新定义一个已经存在的队列;
3、这个rabbitmq是自启动的啊
4、在公平分发的时候,如果把receive1和receive2中的basicQos()这个代码注释掉,会造成消费顺序紊乱
5、rabbitmq里面只有队列有存储能力
消费者、生产者均需要声明队列
6、注意send类里面的发布消息语句
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
一个是发布订阅模式,一个是队列模式
注意参数别填错了
7、覆盖某个方法时,形参名称可以改变
英文单词:
clustering 集群
retrieving 检索、恢复
delivery 传送、传递
inequivalent 不等价的
fanout 分发,扇出
相关异常:
Variable 'channel' is accessed from within inner class, needs to be declared final
HTTP 400 请求出错 由于语法格式有误,服务器无法理解此请求。不作修改,客户程序就无法重复此请求。