amqp协议:
简单队列:
P ----队列-----c
p:消息生产者
队列
c:消费者
偶合性高,生产者一一对应消费者
-------------
work queues 工作队列
一个生产者对应多个消费者
P ---队列---c2/c1/c3
生产者不费时间,消费者消费时间
(轮询机制,每个消费者消费相同的数量,round-robin)
轮询分发:
生产者:
channel.queueDeclare(queue_name,f,f,f,null)//声明
channel.basicPublish("",queue_name,nullm,'消息')
消费者:
channel.Declare(queue_name,f,f,f,null)//声明
channel.basicConsume(queue_name,ture,function(){})//消费者
------
公平分发(fair dipatch):
使用公平分发必须关闭自动应答ack
生产者:
channel.queueDeclare(queue_name,f,f,f,null)//声明
channel.basicQoc(1)//每个消费者发送消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息,限制发送给同一个消费者,不超过一条
消费者:
channel.queueDeclare(queue_name,f,f,f,null)//声明
channel.basicQoc(1)
channel.basicAck(e,false)//发送回执
$autoAck = false//自动应答改为false
channel.basicConsume(queue_name,$autoAck,function(){})//消费者
---------
消息应答与持久化
ack 消息应答((message acknowkedgment)
true 自动确认模式,一发送就删除内存数据,如果杀死该执行的消费者,则会丢失数据
false 手动应答,消息消费之后告诉内存可以删除
消息的持久化
durable (注意,声明好队列就不能修改)
-----
一个消息可以被多个消息消费(订阅模式)
模型:
p -->x--->多个队列--->多个C
x交换机:交换机没有存储能力,在mq中只有队列有存储能力
解读:
1、一个生产者,多个消费者
2、每个消费者有自己的队列
3、生产者没有直接把消息发送到队列 而是发送到交换机上
4、每个队列都要绑定要交换机
5、生产者发送消息,经过交换机,到达队列 就能实现一个消息被多个消费者消费
channel.exchangeDeclare(exchange_name,"fanout")//声明交换机进行分发
$msg = "hello";
channel.basicPublish(exchange_name,"",null,$msg)
消费者
channel.queueDeclare(queue_name,false,false,false,null)//队列声明
channel.queueBind(queueu_name,exchange_name,"");//队列绑定交换机
--------
//声明交换机
chang.exchangeDeclare(exchange_name,"fanout")//分发 第二个参数有(fanout/deret)
chang.basicPublish(exchange_name,"路由键")
fanout(不处理路由键),只需要把队列绑定到交换机,就可以转发到该交换机额所有队列,所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上
direct(处理路由建)所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
topic (会将消息转发到所有匹配topic的queue上) 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”
header
例如:
生产者:
channel.exchangDelare(exchange_name,"direct");
$string = "hello direct!";
channel.basicPublish(exchange_name,routingKey,null $string)
消费者:
channel.queueDeclare(queue_name,f,f,f,null);
channel.basicQos(1);
channel.queueBind(queue_name.exchange,routingKey);//如果有多个,就复制多个
channel.queueBind(queue_name.exchange,routingKey2);//如果有多个,就复制多个
topics exchange 路由和某个模式匹配
#匹配一个或多个
*匹配一个
例:goods.#,可以查到多个
------------
mq消息确认机制(事务+confirm)
可以通过服务器持久化机制,解决rabbitmq服务器异常,数据的丢失
问题:生产者将消息发布到mq服务器中,消息到底有没有送达mq服务器中,默认情况下是不知道的?
两种方式:
amqp实现事务机制
comfirm模式
事务机制
txselect txcommit txrollback
txselect哟农户将当前channel设置成transation模式
txcommit 用于提交事务
txrollback 回滚事务
生产者:
channel.queueDeclare(queue_name,false,false,false,null);//声明一个队列
$a = "aaaaaa"
try{
cnannel.txSelect();//开启事务
channel.basicPublish("",queue_name,null ,$a);
channel.commit();
}catch(exception $e){
channel.xRollback();
}
消费者:
cahnnel.basicConsume(queue_name,true,true,function(){
});
----------
comfirm模式
comfirm模式最大额好处在于异步
生产者端comfirm模式额实现原理
开启comfirm模式
channelConfimSelect();设置为true
编程模式
1、普通waitForConfirms()
2、批量的发一批waitForConfirms
3、异步confirm模式,提供一个回调方法
注意:(如果队列是comfirm模式的话,队列是txselect模式会报错)
普通watFormConfirms模式
生产者:((单条)
channel.queueDeclare(queue_name.false.f.f.null)
channel.confirmSelect();
channel.basicPublish(",queue_name,null,"aaa");
if(channe.waitForConfirms()){
失败
}
生产者:(批量)
channel.queueDeclare(queue_name.false.f.f.null)
channel.confirmSelect();
for(){
channel.basicPublish(",queue_name,null,"aaa");
}
if(channe.waitForConfirms()){
失败
}
异步:
channel.confirmSelect();
------
exchange_declare(exchange,type,durbe,autoDelete,internal,map throws IOException)
exchange 交换机名称
type 类型 fanout/derect/header/topic
durable 是否持久化
autodelete 设置是否自动删除(自动删除前提是至少有一个队列或交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。)
internal设置是否是内置的,如果是true 则表示是内置的交换器。客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
argument 其他一些结构化参数,比如alternate-exchange
参数说明:queue_declare(queue_name,durable,exclusive,map<string,object>arguments)
queue:队列名称
durable:是否持久化
exclusive:是否排外的,是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景
queueBind(queue,exchange,routingkey,argument)
exchangeBing(String destination , String source , String routingKey)
参考文章:https://www.cnblogs.com/julyluo/p/6265775.html
参考文章:https://blog.csdn.net/samxx8/article/details/47417133