zoukankan      html  css  js  c++  java
  • RabbitMQ使用及与spring boot整合

    1.MQ

      消息队列(Message Queue,简称MQ)——应用程序和应用程序之间的通信方法

      应用:不同进程Process/线程Thread之间通信

      比较流行的中间件:

        ActiveMQ

        RabbitMQ(非常重量级,更适合于企业级的开发)

        Kafka(高吞吐量的分布式发布订阅消息系统)

        RocketMQ

      在高并发、可靠性、成熟度等方面,RabbitMQ是首选

      Kafka的性能(吞吐量、TPS)比RabbitMq要高出来很多,但Kafka主要定位在日志方面,如果业务方面还是建议选择RabbitMq

    2.AMQP

      Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计

      主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全

    3.RabbitMQ

    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写

    支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX

    用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗

    (1)安装

      需要先安装Erlang ,再安装RabbitMQ

      环境:win7

      Erlang

        下载 :

          https://www.erlang-solutions.com/resources/download.html 

        安装:

          双击下载的文件(esl-erlang_22.1~windows_amd64.exe) ,下一步进行安装

        安装完后开始菜单多了

          

      RabbitMQ

        下载 :

          https://www.rabbitmq.com/download.html

        安装:

          双击下载的文件(rabbitmq-server-3.8.1.exe) ,下一步进行安装

        安装完后开始菜单多了

          

         选择开始菜单的RabbitMQ Command Prompt(sbin dir)

         

        进入C:Program Files (x86)RabbitMQ Server abbitmq_server-3.4.1sbin输入命令

    rabbitmq-plugins enable rabbitmq_management

    启动了管理工具

    服务启动  net start RabbitMQ
    服务停止  net stop RabbitMQ

    服务启动后,浏览器打开http://localhost:15672/

    使用账号 guest ,密码 guest

    能够登录,安装成功

    (2)用户管理

      Admin选项卡

      A.添加用户

    用户角色:

        超级管理员(administrator)

        监控者(monitoring)

          策略制定者(policymaker)

        普通管理者(management)

        其他

      B.创建Virtual Hosts

       C.设置权限

       选中Admin用户,进入权限设置

     

       已添加权限

     (3)spring boot整合RabbitMQ

      添加依赖

    <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
         <version>2.2.1.RELEASE</version>
    </dependency>

      添加配置

    #对于rabbitMQ的支持
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin
    spring.rabbitmq.virtual-host=testhost
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true

      添加RabbitMQ配置类

    package com.example.demo.configure;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMqConfig {
    
        public static final String RABBITMQ_QUEUE_NAME = "Queue1";
        public static final String RABBITMQ_ORDER_QUEUE_NAME = "OrderQueue1";
        private final static Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);
        @Autowired
        private CachingConnectionFactory cachingConnectionFactory;
    
        @Bean
        public Queue commonQueue() {
            return new Queue(RabbitMqConfig.RABBITMQ_QUEUE_NAME);
        }
    
        @Bean
        public Queue orderQueue() {
            return new Queue(RabbitMqConfig.RABBITMQ_ORDER_QUEUE_NAME);
        }
    
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange("directExchange");
        }
    
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange("topicExchange");
        }
    
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
        // 建立Queue与Exchange的绑定关系
        @Bean
        public Binding bindingExchangeMessage(Queue orderQueue, DirectExchange directExchange) {
            return BindingBuilder.bind(orderQueue).to(directExchange).with(
                    RabbitMqConfig.RABBITMQ_ORDER_QUEUE_NAME);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
            cachingConnectionFactory.setPublisherConfirms(true);
            cachingConnectionFactory.setPublisherReturns(true);
            RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
            rabbitTemplate.setMandatory(true);
    
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack)
                    logger.info("消息发送成功: correlationData:({}),ack:({ack}),cause:({})", correlationData,
                            ack, cause);
                else
                    logger.info("消息发送失败: correlationData:({}),ack:({ack}),cause:({})", correlationData,
                            ack, cause);
            });
    
            rabbitTemplate.setReturnCallback(
                    (message, replyCode, replyText, exchange, routingKey) -> logger.info(
                            "消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange,
                            routingKey, replyCode, replyText, message));
    
            return rabbitTemplate;
        }
    }

      生产者

    package com.example.demo.mq;
    
    import com.example.demo.configure.RabbitMqConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class OrderMaker {
        private final static Logger logger = LoggerFactory.getLogger(OrderMaker.class);
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(String content)
        {
            this.rabbitTemplate.convertAndSend(RabbitMqConfig.RABBITMQ_ORDER_QUEUE_NAME,content);
        }
    }

      测试入口

    package com.example.demo.controller;
    
    import com.example.demo.mq.OrderMaker;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.MediaType;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class Demo {
    
        @Autowired
        private OrderMaker orderMaker;
        @RequestMapping(value = "/testMq",method = RequestMethod.GET,produces = MediaType.ALL_VALUE)
        public String testMq(String msg)
        {
            orderMaker.send(msg);
            System.out.println(msg);
            return "Successfully.";
        }
    }

      使用postman测试http://127.0.0.1:8080/testMq?msg=hahaha,this is a test

      在http://localhost:15672中

      OrderQueue1队列有两条消息

      查看消息

     

        消费者

    package com.example.demo.mq;
    
    import com.example.demo.configure.RabbitMqConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = RabbitMqConfig.RABBITMQ_ORDER_QUEUE_NAME)
    public class OrderListener {
        private final static Logger logger = LoggerFactory.getLogger(OrderListener.class);
    
        @RabbitHandler
        public void process(String orderMsg)
        {
            logger.info("订单消费者收到消息:" + orderMsg);
        }
    }

      重新启动

      log输出

    2019-11-13 14:36:51.500 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  com.example.demo.mq.OrderListener - 订单消费者收到消息:hahaha,this is a test
    2019-11-13 14:36:51.516 [AMQP Connection 127.0.0.1:5672] INFO com.example.demo.configure.RabbitMqConfig - 消息发送成功: correlationData:(null),ack:({ack}),cause:(true)

    这样就实现了简单的队列,生产者将消息发送到队列,消费者从队列中获取消息

    P:消息的生产者
    C:消息的消费者
    红色:队列

  • 相关阅读:
    zabbix3.4监控Windows-CPU使用率磁盘IO磁盘监控阈值邮件报警详细配置
    网站打开提示Service Unavailable如何解决?
    windows服务器下zabbix agent的安装调试
    如何判断自己的VPS是那种虚拟技术实现的?
    win7/win10下KiWi Syslog服务器的安装与配置
    centos7.x内核参数优化脚本
    centos7.6更改主机名为FQDN格式不生效解决办法
    chmod: changing permissions of 'xxx': Operation not permitted
    编译安装nginx提示./configure: error: C compiler cc is not found
    centos7利用系统镜像修复grub
  • 原文地址:https://www.cnblogs.com/baby123/p/11848958.html
Copyright © 2011-2022 走看看