AMQP[高级消息队列协议] 是一个异步消息传递所使用的应用层协议规范(是线路层协议)
AMQP 客户端能够无视消息的来源任意发送和接受信息
队列的使用场景:
1、与业务的主要逻辑无关,但又需要执行,就可以使用队列异步处理。
将与主逻辑无关的操作放入队列,因为程序要等待全部操作执行完才返回用户操作结果,就会减低用户体验。
2、耗时操作放队列执行、一般不关心执行结果的(发送邮件)。
3、利用队列的特性,可以使一些应用更加简单。
1、队列支持多种语言客户端,可以实现不同语言间的相互通信。
2、队列有支持delay特性,可以实现简单的定时效果。
4、两个(多个)系统间需要通过定时任务来同步数据(数据交换)。
5、异构系统的不同进程间相互调用、通讯。
基本概念:
Broker:简单来说就是消息队列服务器实体。
接收客户端连接,实现AMQP消息队列和路由功能的进程。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
从连接通道(Channel)接收消息,并按照特定的路由规则发送给队列。接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,ExchangeType有direct、Fanout和Topic三种。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
当消息不能被正常消费时缓存这些消息。(存储还未被消费者消费的消息)
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
1、即路由规则,告诉交换机将何种类型的消息发送到某个队列中。
2、Exchange在与多个Message Queue发生Binding后会生成一张路由表。
3、路由表中存储着Message Queue所需消息的限制条件即Binding Key,当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。(每个virtual Host里面有若干个Exchange和Queue)
虚拟机可以持有多个交换机、队列和绑定。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
1、仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。
2、一个Connection可以包含多个Channel,需要Channel是因为TCP连接的建立和释放都是十分昂贵的。
message: 消息体,由producer产生,经broker被consumer所消费。
包括Header,Body两部分,Header是由Producer添加上的各种属性集合,控制Message是否被缓存,接收的queue是哪个等等。Body是真正需要传送的数据。
Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。
模型:
Publisher --> Broker(消息队列服务器实体) --> Consumer
client[Publisher Application] --> Message --> Virtual Host[ Exchange-->Message queue ] --> client[Consumer]
RabbitMQ的使用场景:
1、单发送,单接收。
简单的发送与接收,没有特别的处理。
2、单发送多接收。
一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。
3、发布、订阅模式。
发布、订阅模式,发送端发送广播消息,多个接收端接收。
4、Routing (按路线发送接收)
发送端按routing key发送消息,不同的接收端按不同的routing key接收消息。
5、Topics (按topic发送接收)
发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。
# RabbitMQ的开启关闭命令: [root@localhost ~]# /opt/rabbitmq/sbin/rabbitmq-server -detached start // 启动rabbitMQ [root@localhost ~]# /opt/rabbitmq/sbin/rabbitmqctl status // 查看状态 [root@localhost ~]# /opt/rabbitmq/sbin/rabbitmqctl stop // 关闭rabbitMQ
实现的PHP代码:
消息消费者端:
<?php //连接RabbitMQ $conn_args = array( 'host'=>'localhost' , 'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest', 'vhost' =>'/' ); $conn = new AMQPConnection($conn_args); $conn->connect(); //设置queue名称,使用exchange,绑定routingkey $channel = new AMQPChannel($conn); $q = new AMQPQueue($channel); $q->setName('queue_name'); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); $q->declareQueue(); $q->bind('direct_exchange_name', 'routingkey_name'); //消息获取 $messages = $q->get(AMQP_AUTOACK) ; if ($messages){ var_dump(json_decode($messages->getBody(), true )); } $conn->disconnect();
消息生产者端:
<?php //连接RabbitMQ $conn_args = array( 'host'=>'localhost' , 'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest', 'vhost' =>'/' ); $conn = new AMQPConnection($conn_args); $conn->connect(); //创建exchange名称和类型 $channel = new AMQPChannel($conn); $ex = new AMQPExchange($channel); $ex->setName('direct_exchange_name'); $ex->setType(AMQP_EX_TYPE_DIRECT); $ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); $ex->declareExchange(); //创建queue名称,使用exchange,绑定routingkey $q = new AMQPQueue($channel); $q->setName('queue_name'); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); $q->declareQueue(); $q->bind('direct_exchange_name', 'routingkey_name'); //消息发布 $channel->startTransaction(); $message = json_encode(array('Hello World!','DIRECT')); $ex->publish($message, 'routingkey_name'); $channel->commitTransaction(); $conn->disconnect();