zoukankan      html  css  js  c++  java
  • RabbitMq初体验

    简介:

    RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
    AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
    RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    组成

    • 生产者
    • 消息队列
    • 消费者
    • 交换机:
      • 隔离生产者和消息队列,充当二者中间体。
      • 接受相应的消息并绑定到指定队列

    发布模式

    根据交换机类型不同,分为3种:

    1. Direct<直接>:1对1-----一个消息只能被一个消费者消费
    2. Topic<主题>:1对多-----一个消息可以被多个消费者消费
    3. Fanout<分列>:广播
    注:一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中

    整合springboot

    pom

        <dependency>
        	<groupId>org.springframework.boot</groupId>
        	<artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    

    yml

    spring:
      application:
        name: spring-boot-amqp
      rabbitmq:
        host: xxx.xxx.xxx.xxx
        port: 5672
        username: rabbit
        password: 123456
    

    config

    API:BindingBuilder.bind(指定队列).to(交换机).with(路由键);
    package com.hxtec.polaris.configure;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * API:BindingBuilder.bind(指定队列).to(交换机).with(路由键);
     */
    @Configuration
    public class MyAMQPConfig {
        final static String tpm = "topic.message";
        final static String tp1m = "topic1.message";
        final static String tpm1 = "topic.message1";
        final static String tpm2 = "topic.message2";
    
        @Bean
        public Queue tpm() {
            return new Queue(MyAMQPConfig.tpm);
        }
    
        @Bean
        public Queue tp1m() {
            return new Queue(MyAMQPConfig.tp1m);
        }
    
        @Bean
        public Queue tpm1() {
            return new Queue(MyAMQPConfig.tpm1);
        }
    
        @Bean
        public Queue tpm2() {
            return new Queue(MyAMQPConfig.tpm2);
        }
    
        @Bean
        TopicExchange topicExchange() {
            return new TopicExchange("topic.exchange");
        }
    
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanout.exchange");
        }
    
        @Bean
        DirectExchange directExchange() {
            return new DirectExchange("direct.exchange");
        }
    
        /**
         * topic单播,只给topic.message发送
         * @param tpm
         * @param topicExchange
         * @return
         */
        @Bean
        Binding bindingTopicExchangeMessage(Queue tpm, TopicExchange topicExchange) {
            return BindingBuilder.bind(tpm).to(topicExchange).with("topic.message");
        }
    
        /**
         * Fanout广播,给绑定fanoutExchange的queues全部发送
         * @param tpm
         * @param fanoutExchange
         * @return
         */
        @Bean
        Binding bindingFanoutExchangeMessage(Queue tpm, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(tpm).to(fanoutExchange);
        }
    
        /**
         * 绑定规则
         * @param tpm
         * @param directExchange
         * @return
         */
        @Bean
        Binding bindingDirectExchangeMessage(Queue tpm,DirectExchange directExchange) {
            return BindingBuilder.bind(tpm).to(directExchange).with("topic.message#");
        }
    }
    
    

    sender

    API:amqpTemplate.convertAndSend("交换机名",“路由键”,“消息内容”)
    package com.hxtec.polaris.ampq;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * API:amqpTemplate.convertAndSend("交换机名",“路由键”,“消息内容”)
     */
    @Component
    public class RabbitHelloSender {
    
    	@Autowired
    	private RabbitTemplate rabbitTemplate;
    
    	public void send() {
    		String context = "hello " + new Date();
    		System.out.println("Sender : " + context);
    		this.rabbitTemplate.convertAndSend("topic.exchange","tpm", context);
    	}
    
    }
    

    receiver

    @RabbitListener(queues = "direct"):监听器监听指定队列
    package com.hxtec.polaris.ampq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @RabbitListener(queues = "direct"):监听器监听指定队列
     */
    @Component
    @RabbitListener(queues = "topic.message")
    public class RabbitHelloReceiver {
    
    	@RabbitHandler
    	public void process(String hello) {
    		System.out.println("Receiver  : " + hello);
    	}
    }
    
  • 相关阅读:
    Sublime text 3支持utf-8
    ubuntu17.10 安装firefox的flash
    opencv mat裁剪
    Ubuntu寻找某某库
    Ubuntu的 g++ gcc版本升降级
    Autotools知识点
    Counted(内存管理机制)
    operator new和operator delete
    STL学习笔记:空间配置器allocator
    function call操作符(operator()) 仿函数(functor)
  • 原文地址:https://www.cnblogs.com/faramita/p/12779502.html
Copyright © 2011-2022 走看看