zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列总结

    AMQP[高级消息队列协议] 是一个异步消息传递所使用的应用层协议规范(是线路层协议)
    AMQP 客户端能够无视消息的来源任意发送和接受信息

    队列的使用场景:

    1、与业务的主要逻辑无关,但又需要执行,就可以使用队列异步处理。

      将与主逻辑无关的操作放入队列,因为程序要等待全部操作执行完才返回用户操作结果,就会减低用户体验。

    2、耗时操作放队列执行、一般不关心执行结果的(发送邮件)。

    3、利用队列的特性,可以使一些应用更加简单。

      1、队列支持多种语言客户端,可以实现不同语言间的相互通信。

      2、队列有支持delay特性,可以实现简单的定时效果。

    4、两个(多个)系统间需要通过定时任务来同步数据(数据交换)。

    5、异构系统的不同进程间相互调用、通讯。

    基本概念:

    Broker:简单来说就是消息队列服务器实体。

      接收客户端连接,实现AMQP消息队列和路由功能的进程。

    Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

      从连接通道(Channel)接收消息,并按照特定的路由规则发送给队列。接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,ExchangeType有direct、Fanout和Topic三种。

    Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

       当消息不能被正常消费时缓存这些消息。(存储还未被消费者消费的消息)

    Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

      1、即路由规则,告诉交换机将何种类型的消息发送到某个队列中。

      2、Exchange在与多个Message Queue发生Binding后会生成一张路由表。

      3、路由表中存储着Message Queue所需消息的限制条件即Binding Key,当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。

    Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

    vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。(每个virtual Host里面有若干个Exchange和Queue)

      虚拟机可以持有多个交换机、队列和绑定。

    producer:消息生产者,就是投递消息的程序。

    consumer:消息消费者,就是接受消息的程序。

    channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

      1、仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。

      2、一个Connection可以包含多个Channel,需要Channel是因为TCP连接的建立和释放都是十分昂贵的。

    message: 消息体,由producer产生,经broker被consumer所消费。

      包括Header,Body两部分,Header是由Producer添加上的各种属性集合,控制Message是否被缓存,接收的queue是哪个等等。Body是真正需要传送的数据。

    Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。

    模型:

    Publisher --> Broker(消息队列服务器实体) --> Consumer

    client[Publisher Application] --> Message --> Virtual Host[ Exchange-->Message queue ] --> client[Consumer]

    RabbitMQ的使用场景:

    1、单发送,单接收。

      简单的发送与接收,没有特别的处理。

    2、单发送多接收。

      一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。

    3、发布、订阅模式。

      发布、订阅模式,发送端发送广播消息,多个接收端接收。

    4、Routing (按路线发送接收)

      发送端按routing key发送消息,不同的接收端按不同的routing key接收消息。

    5、Topics (按topic发送接收)

      发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。

    # RabbitMQ的开启关闭命令:
    [root@localhost ~]# /opt/rabbitmq/sbin/rabbitmq-server -detached  start   // 启动rabbitMQ
    [root@localhost ~]# /opt/rabbitmq/sbin/rabbitmqctl status 			 	  // 查看状态
    [root@localhost ~]# /opt/rabbitmq/sbin/rabbitmqctl stop				 	  // 关闭rabbitMQ
    

    实现的PHP代码:

    消息消费者端:

    <?php 
    
    //连接RabbitMQ  
    $conn_args = array( 
    	'host'=>'localhost' , 
    	'port'=> '5672', 
    	'login'=>'guest' , 
    	'password'=> 'guest',
    	'vhost' =>'/'
    );  
    $conn = new AMQPConnection($conn_args);  
    $conn->connect();  
    //设置queue名称,使用exchange,绑定routingkey  
    $channel = new AMQPChannel($conn);  
    $q = new AMQPQueue($channel);  
    $q->setName('queue_name');  
    $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);  
    $q->declareQueue();  
    $q->bind('direct_exchange_name', 'routingkey_name');     
    //消息获取  
    $messages = $q->get(AMQP_AUTOACK) ;  
    if ($messages){  
          var_dump(json_decode($messages->getBody(), true ));  
    }  
    $conn->disconnect(); 

    消息生产者端:

    <?php 
    
    //连接RabbitMQ  
    $conn_args = array( 
    	'host'=>'localhost' , 
    	'port'=> '5672', 
    	'login'=>'guest' ,   
    	'password'=> 'guest',
    	'vhost' =>'/'
    );  
    $conn = new AMQPConnection($conn_args);  
    $conn->connect();  
    //创建exchange名称和类型  
    $channel = new AMQPChannel($conn);  
    $ex = new AMQPExchange($channel);  
    $ex->setName('direct_exchange_name');  
    $ex->setType(AMQP_EX_TYPE_DIRECT);  
    $ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);  
    $ex->declareExchange();  
    //创建queue名称,使用exchange,绑定routingkey  
    $q = new AMQPQueue($channel);  
    $q->setName('queue_name');  
    $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);  
    $q->declareQueue();  
    $q->bind('direct_exchange_name', 'routingkey_name');  
    //消息发布  
    $channel->startTransaction();  
    $message = json_encode(array('Hello World!','DIRECT'));  
    $ex->publish($message, 'routingkey_name');  
    $channel->commitTransaction();  
    $conn->disconnect(); 
    
  • 相关阅读:
    mysql优化——语句优化小技巧
    mysql优化——索引与索引优化
    Mysql存储引擎
    Mysql优化技术
    数据库设计——三范式
    java多线程(二)——用到的设计模式
    java多线程(一)
    ubuntu下如何查看用户登录及用户操作相关信息
    hdu 2546
    hdu 2955
  • 原文地址:https://www.cnblogs.com/xi-jie/p/11821180.html
Copyright © 2011-2022 走看看