zoukankan      html  css  js  c++  java
  • RabbitMq 初步

    RabbitMQ的工作原理

    它的基本结构

    组成部分说明如下:

    Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。

    Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。

    Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

    Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。

    Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

     

    Maven举例配置

    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp‐client</artifactId>
    <version>4.0.3</version><!‐‐此版本与spring boot 1.5.9版本匹配‐‐>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐starter‐logging</artifactId>
    </dependency>

    生产者举例Demo

    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitmqConfig {
        public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
        public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
        public static final String ROUTINGKEY_SMS="inform.#.sms.#";
    
        //声明交换机
        @Bean(EXCHANGE_TOPICS_INFORM)
        public Exchange EXCHANGE_TOPICS_INFORM(){
            //durable(true) 持久化,mq重启之后交换机还在
            return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
        }
    
        //声明QUEUE_INFORM_EMAIL队列
        @Bean(QUEUE_INFORM_EMAIL)
        public Queue QUEUE_INFORM_EMAIL(){
            return new Queue(QUEUE_INFORM_EMAIL);
        }
        //声明QUEUE_INFORM_SMS队列
        @Bean(QUEUE_INFORM_SMS)
        public Queue QUEUE_INFORM_SMS(){
            return new Queue(QUEUE_INFORM_SMS);
        }
    
        //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
        @Bean
        public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                                  @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
        }
        //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
        @Bean
        public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                                  @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
        }
    }

    消费者举例Demo

    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitmqConfig {
        public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
        public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
        public static final String ROUTINGKEY_SMS="inform.#.sms.#";
    
        //声明交换机
        @Bean(EXCHANGE_TOPICS_INFORM)
        public Exchange EXCHANGE_TOPICS_INFORM(){
            //durable(true) 持久化,mq重启之后交换机还在
            return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
        }
    
        //声明QUEUE_INFORM_EMAIL队列
        @Bean(QUEUE_INFORM_EMAIL)
        public Queue QUEUE_INFORM_EMAIL(){
            return new Queue(QUEUE_INFORM_EMAIL);
        }
        //声明QUEUE_INFORM_SMS队列
        @Bean(QUEUE_INFORM_SMS)
        public Queue QUEUE_INFORM_SMS(){
            return new Queue(QUEUE_INFORM_SMS);
        }
    
        //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
        @Bean
        public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                                  @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
        }
        //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
        @Bean
        public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                                  @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
        }
    }

    工作模式 

    RabbitMQ有以下几种工作模式 :

    1、Work queues

    2、Publish/Subscribe

    3、Routing

    4、Topics

    5、Header

    6、RPC

     

    Work queues

    work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。

    应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

    测试:

    1、使用入门程序,启动多个消费者。

    2、生产者发送多个消息。

    结果:

    1、一条消息只会被一个消费者接收;

    2、rabbit采用轮询的方式将消息是平均发送给消费者的;

    3、消费者在处理完某条消息后,才会收到下一条消息。

     

    Publish/subscribe 发布订阅模式

    发布订阅模式:

    1、每个消费者监听自己的队列。

    2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收

    到消息

     

    Routin

    路由模式:

    1、每个消费者监听自己的队列,并且设置routingkey。

    2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。

    这是一种非常灵活的模式,经常被用到

     

    Topics

     

     

    路由模式:

    1、每个消费者监听自己的队列,并且设置带统配符的routingkey。

    2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

     

    Header模式

    header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配

    队列。

    案例:

    根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种

    通知类型都接收的则两种通知都有效。

     

    生产者Demo:

     

    Map<String, Object> headers_email = new Hashtable<String, Object>();
    headers_email.put("inform_type", "email");
    Map<String, Object> headers_sms = new Hashtable<String, Object>();
    headers_sms.put("inform_type", "sms");
    channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
    channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);

    通知Demo :

    String message = "email inform to user"+i;
    Map<String,Object> headers = new Hashtable<String, Object>();
    headers.put("inform_type", "email");//匹配email通知消费者绑定的header
    //headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header
    AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
    properties.headers(headers);
    //Email通知
    channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());

    发送邮件消费者 : 

    channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
    Map<String, Object> headers_email = new Hashtable<String, Object>();
    headers_email.put("inform_email", "email");
    //交换机和队列绑定
    channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
    //指定消费队列
    channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);

    RPC

     

    RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

    1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

    2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果

    3、服务端将RPC方法 的结果发送到RPC响应队列

    4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

     

     

  • 相关阅读:
    linux process management
    intel edison with grove lcd
    ODBC、OLEDB和ADO关系
    在线并使用数据库来推断在线
    大约laravel错误的解决方案
    cocos2dx-3.1加入cocosStudio参考库 libCocosStudio
    【甘道夫】基于Mahout0.9+CDH5.2执行分布式ItemCF推荐算法
    格式字符串分配stl::string
    希望开发一个指针的元素
    [Now] Configure secrets and environment variables with Zeit’s Now
  • 原文地址:https://www.cnblogs.com/zhenghongxin/p/10792818.html
Copyright © 2011-2022 走看看