zoukankan      html  css  js  c++  java
  • 深入浅出 RabbitMQ

    什么是 RabbitMQ

    简介(优点)

    • 基于 ErLang 语言开发有高可用高并发的优点,适合集群。
    • 开源、稳定、易用、跨平台、支持多种语言、文档齐全。
    • 有消息确认机制和持久化机制,可靠性高。

    概念

    生产者和消费者

    • Producer:消息的生产者
    • Consumer:消息的消费者

    Queue

    • 消息队列提供了 FIFO 的处理机制,具有缓存消息的能力。在 RabbitMQ 中,队列消息可以设置为持久化,临时或者自动删除。
    • 如果是持久化的队列,Queue 中的消息会在 Server 本地硬盘存储一份,防止系统 Crash 数据丢失。
    • 如果是临时的队列,Queue 中的数据在系统重启之后就会丢失。
    • 如实是自动删除的队列,当不存在用户连接到 Server,队列中的数据会被自动删除。

    ExChange

    ExChange 类似于数据通信网络中的交换机,提供消息路由策略。

    RabbitMQ 中,生产者不是将消息直接发送给 Queue,而是先发送给 ExChangeExChange 根据生产者传递的 key 按照特定的路由算法将消息给指定的 Queue。一个 ExChange 可以绑定多个 Queue。和 Queue 一样,ExChange 也可以设置为持久化、临时或者自动删除。

    Binding

    所谓绑定就是将一个特定的 ExChange 和一个特定的 Queue 绑定起来。ExChangeQueue 的绑定可以是多对多的关系。

    Virtual Host

    RabbitMQ Server上可以创建多个虚拟的 Message Broker(又叫做 Virtual Hosts)。每一个 vhost 本质上是一个迷你的 RabbitMQ Server,分别管理各自的 ExChangebinding。生产者和消费者连接 RabbitMQ Server 需要指定一个 Virtual Host

    使用过程

    1. 客户端连接到消息队列服务器,打开一个 Channel
    2. 客户端声明一个 ExChange,并设置相关属性。
    3. 客户端声明一个 Queue,并设置相关属性。
    4. 客户端使用 Routing Key,在 ExChangeQueue 之间建立好绑定关系。
    5. 客户端投递消息到 ExChange
    6. ExChange 接收到消息后,就根据消息的 key 和已经设置的 bingding,进行消息路由,将消息投递到一个或多个队列里。

    部署 RabbitMQ

    使用 Docker Compose 部署

    创建 docker-compose.yml

    version: '3.1'
    services:
      rabbitmq:
        restart: always
        image: rabbitmq:management
        container_name: rabbitmq
        ports:
          - 5672:5672
          - 15672:15672
        environment:
          TZ: Asia/Shanghai
          RABBITMQ_DEFAULT_USER: rabbit
          RABBITMQ_DEFAULT_PASS: 123456
        volumes:
          - ./data:/var/lib/rabbitmq
    

    RabbitMQ WebUI 界面

    • 访问地址:http://{ip}:15672

    • 首页

    • Global counts

    • 交换机页

    • 队列页

      • Name:消息队列的名称,这里是通过程序创建的

      • Features:消息队列的类型,durable:true 为会持久化消息

      • Ready:准备好的消息

      • Unacked:未确认的消息

      • Total:全部消息

        如果都为 0 则说明全部消息处理完成

    使用 RabbitMQ

    创建生产者

    创建一个名为 spring-boot-amqp-provider 的生产者项目。

    相关配置

    • 创建 application.yml 文件

      spring:
        application:
          name: spring-boot-amqp
        rabbitmq:
          host: 192.168.75.133
          port: 5672
          username: rabbit
          password: 123456
      
    • 创建队列

      import org.springframework.amqp.core.Queue;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      /**
       * 队列配置
       */
      @Configuration
      public class RabbitMQConfiguration {
          
          @Bean
          public Queue queue() {
              return new Queue("helloRabbitMQ");
          }
      }
      
    • 创建消息提供者

      import org.springframework.amqp.core.AmqpTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Component;
      
      import java.util.Date;
      
      /**
       * 消息提供者
       */
      @Component
      public class RabbitMQProvider {
          
          @Autowired
          private AmqpTemplate amqpTemplate;
      
          public void send() {
              String context = "hello" + new Date();
              System.out.println("Provider: " + context);
              amqpTemplate.convertAndSend("helloRabbitMQ", context);
          }
      }
      

    发送消息

    创建测试用例

    import com.lusifer.spring.boot.amqp.Application;
    import com.lusifer.spring.boot.amqp.provider.HelloRabbitProvider;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = Application.class)
    public class AmqpTest {
        
        @Autowired
        private HelloRabbitProvider helloRabbitProvider;
    
        @Test
        public void testSender() {
            for (int i = 0; i < 10; i++) {
                RabbitMQProvider.send();
            }
        }
    }
    

    创建消费者

    创建一个名为 spring-boot-amqp-consumer 的消费者项目。

    相关配置

    创建 application.yml 文件

    spring:
      application:
        name: spring-boot-amqp-consumer
      rabbitmq:
        host: 192.168.75.133
        port: 5672
        username: rabbit
        password: 123456
    

    接收消息

    创建消息的消费监听组件

    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "helloRabbitMQ")
    public class HelloRabbitConsumer {
        
        @RabbitHandler
        public void process(String message) {
            System.out.println("Consumer: " + message);
        }
    }
    
  • 相关阅读:
    Cloudstack安装(二)
    Cloudstack介绍(一)
    [Python爬虫] 之二十二:Selenium +phantomjs 利用 pyquery抓取界面网站数据
    [Python爬虫] 之二十一:Selenium +phantomjs 利用 pyquery抓取36氪网站数据
    [Python爬虫] 之二十:Selenium +phantomjs 利用 pyquery通过搜狗搜索引擎数据
    [Python爬虫] 之十九:Selenium +phantomjs 利用 pyquery抓取超级TV网数据
    [Python爬虫] 之十八:Selenium +phantomjs 利用 pyquery抓取电视之家网数据
    [Python爬虫] 之十七:Selenium +phantomjs 利用 pyquery抓取梅花网数据
    pycharm2016序列号失效问题解决办法
    scrapy-splash抓取动态数据例子十六
  • 原文地址:https://www.cnblogs.com/antoniopeng/p/13335373.html
Copyright © 2011-2022 走看看