zoukankan      html  css  js  c++  java
  • springboot整合rabbitMQ

      当前社区活跃度最好的消息中间件就是kafka和rabbitmq了,前面对kafaka的基础使用做了一些总结,最近开始研究rabbitmq,查看了很多资料,自己仿着写了一些demo,在博客园记录一下。

    rabbitmq基础知识

      关于rabbitmq基础知识,可以看这篇博客,介绍的很详细了:https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html,这里分享一张核心概念图

    rabbitmq安装

      rabbitmq的安装很简单,我们可以根据自己的系统去网上找对应的安装说明,这里我为了方便,采用docker镜像的方式,我的虚拟机装的是centos7。步骤如下:

      1、启动docker,关闭防火墙

      2、拉取镜像:docker pull rabbitmq,如需要管理界面:docker pull rabbitmq:management

      3、执行指令启动RabbitMQ

      无管理界面:

      docker run --hostname rabbit-host --name rabbit -d -p 5672:5672 rabbitmq

      有管理界面:

      docker run --hostname rabbit-host --name rabbit -d -p 5672:5672 -p 15672:15672 rabbitmq:management

      4、启动后输入你的虚拟机地址+端口号15672,即可访问到rabbitmq登录界面,默认用户名和密码都是guest。

    springboot与rabbitmq整合

      IDE:STS,这是spring官方推荐的开发工具,构建springboot项目非常方便。JDK:1.8

      1、pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>powerx.io</groupId>
        <artifactId>springboot-rabbitmq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>springboot-rabbitmq</name>
        <description>Demo project for Spring Boot</description>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.1.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    
    </project>

      2、定义常量

    package com.example.demo.constant;
    
    public interface QueueConstants {
    
        // 消息交换
        String MESSAGE_EXCHANGE = "message.direct.myexchange";
        // 消息队列名称
        String MESSAGE_QUEUE_NAME = "message.myqueue";
        // 消息路由键
        String MESSAGE_ROUTE_KEY = "message.myroute";
    
        // 死信消息交换
        String MESSAGE_EXCHANGE_DL = "message.direct.dlexchange";
        // 死信消息队列名称
        String MESSAGE_QUEUE_NAME_DL = "message.dlqueue";
        // 死信消息路由键
        String MESSAGE_ROUTE_KEY_DL = "message.dlroute";
    
    }

      3、rabbitmq配置

    package com.example.demo.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.ExchangeBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import com.example.demo.constant.QueueConstants;
    
    @Configuration
    public class MyRabbitMqConfiguration {
    
        /**
         * 交换配置
         *
         * @return
         */
        @Bean
        public DirectExchange messageDirectExchange() {
            return (DirectExchange) ExchangeBuilder.directExchange(QueueConstants.MESSAGE_EXCHANGE)
                    .durable(true)
                    .build();
        }
    
        /**
         * 消息队列声明
         *
         * @return
         */
        @Bean
        public Queue messageQueue() {
            return QueueBuilder.durable(QueueConstants.MESSAGE_QUEUE_NAME)
                    .build();
        }
    
        /**
         * 消息绑定
         *
         * @return
         */
        @Bean
        public Binding messageBinding() {
            return BindingBuilder.bind(messageQueue())
                    .to(messageDirectExchange())
                    .with(QueueConstants.MESSAGE_ROUTE_KEY);
        }
    
    
    
    }

      4、生产者

    package com.example.demo.producer;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.example.demo.constant.QueueConstants;
    
    @Component
    public class MessageProducer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void sendMessage(String str) {
            
            rabbitTemplate.convertAndSend(QueueConstants.MESSAGE_EXCHANGE, QueueConstants.MESSAGE_ROUTE_KEY, str);
        }
    
    }

      5、消费者

    package com.example.demo.consumer;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import com.example.demo.constant.QueueConstants;
    import com.rabbitmq.client.Channel;
    
    @Component
    public class MessageConsumer {
        @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME)
        public void processMessage(Channel channel,Message  message) {
            System.out.println("MessageConsumer收到消息:"+new String(message.getBody()));
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                
            } 
        }
    }

      6、控制器类

    package com.example.demo.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.example.demo.producer.MessageProducer;
    
    @RestController
    public class TestController {
    
        @Autowired
        private MessageProducer messageProducer;
        
        @RequestMapping(value = "/index")
        public String index(String str) {
            // 将实体实例写入消息队列
            messageProducer.sendMessage(str);
            return "Success";
        }
    
    }

      7、application.properties

    #用户名
    spring.rabbitmq.username=guest
    #密码
    spring.rabbitmq.password=guest
    #服务器ip
    spring.rabbitmq.host=192.168.1.124
    #虚拟空间地址
    spring.rabbitmq.virtual-host=/
    #端口号
    spring.rabbitmq.port=5672

      至此,springboot整合rabbitmq基本demo完毕,这里不再贴出演示截图。

    消息序列化

      涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析,RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter。

      SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差;当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能。

      Jackson2JsonMessageConverter配置如下:

      1、消息发送者设置序列化方式 :rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

      2、消息消费者也应配置 MessageConverter 为 Jackson2JsonMessageConverter

    @Configuration
    public class RabbitMQConfig {
        
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            return factory;
        }
    
    }

      3、消费消息

    @Component
    @RabbitListener(queues = "consumer_queue")
    public class Receiver {
    
        @RabbitHandler
        public void processMessage1(@Payload User user) {
            System.out.println(user.getName());
        }
    
    }

    消息确认

      RabbitMQ的消息确认有两种。

      一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。

      第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。

    消息发送确认

      1、ConfirmCallback

      确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调,使用该功能需要开启确认,spring-boot中配置如下:

      spring.rabbitmq.publisher-confirms = true

      在MessageProducer.java加入如下代码:

    rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
                System.out.println("消息唯一标识" + correlationData);
                System.out.println("消息确认结果" + ack);
                System.out.println("失败原因" + cause);
            });

      2、ReturnCallback

      通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发),使用该功能需要开启确认,spring-boot中配置如下:spring.rabbitmq.publisher-returns = true

      在MessageProducer.java加入如下代码:

    rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText,
                    String exchange, String routingKey) ->{
                        System.out.println("消息主体message" + message);
                        System.out.println("消息replyCode" + replyCode);
                        System.out.println("消息replyText" + replyText);
                        System.out.println("消息使用的交换器" + exchange);
                        System.out.println("消息使用的路由键" + routingKey);
                    });

    消息消费确认

      消费确认模式有三种:NONE、AUTO、MANUAL。

      开启手动确认需要在配置中加入:spring.rabbitmq.listener.direct.acknowledge-mode=manual

      消息在处理失败后将再次返回队列,重新尝试消费,如果再次失败则直接拒绝。

      实例代码如下:

    package com.example.demo.consumer;
    
    import java.io.IOException;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import com.example.demo.constant.QueueConstants;
    import com.rabbitmq.client.Channel;
    
    @Component
    public class MessageConsumer {
        @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME)
        public void processMessage(Channel channel, Message message) {
            System.out.println("MessageConsumer收到消息:" + new String(message.getBody()));
            try {
                //模拟消息处理失败
                int a = 3 / 0;
                // false只确认当前一个消息收到,true确认所有consumer获得的消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    
            } catch (Exception e) {
                if (message.getMessageProperties().getRedelivered()) {
    
                    System.out.println("消息已重复处理失败,拒绝再次接收...");
                    try {
                        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);//requeue为false,拒绝
                    } catch (IOException e1) {
                    }
    
                } else {
    
                    System.out.println("消息即将再次返回队列处理...");
    
                    try {
                        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // requeue为true重新回到队列
                    } catch (IOException e1) {
                    }
    
                }
            }
        }
    }

    死信队列

      DLX, Dead-Letter-Exchange。利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这样我们就可以重新去处理这个消息。DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理 。消息变成死信一向有一下几种情况:

      利用DLX,我们可以实现消息的延迟消费,可参考:https://www.jianshu.com/p/b74a14c7f31d,还可以像我的demo那样,对于有问题的消息进行重新处理,实例代码如下

      首先在MyRabbitMqConfiguration上加入如下配置:

    @Bean
        DirectExchange messagedlDirect() {
            return (DirectExchange) ExchangeBuilder.directExchange(QueueConstants.MESSAGE_EXCHANGE_DL).durable(true)
                    .build();
        }
    
        @Bean
        Queue messagedlQueue() {
            return QueueBuilder.durable(QueueConstants.MESSAGE_QUEUE_NAME_DL)
                    // 配置到期后转发的交换
                    .withArgument("x-dead-letter-exchange", QueueConstants.MESSAGE_EXCHANGE)
                    // 配置到期后转发的路由键
                    .withArgument("x-dead-letter-routing-key", QueueConstants.MESSAGE_ROUTE_KEY).build();
        }
    
        @Bean
        public Binding messageTtlBinding(Queue messagedlQueue, DirectExchange messagedlDirect) {
            return BindingBuilder.bind(messagedlQueue).to(messagedlDirect).with(QueueConstants.MESSAGE_ROUTE_KEY_DL);
        }

      其次,修改我们的消息发送者,发送消息到我们新加入的交换器和路由键上,如下:

    rabbitTemplate.convertAndSend(QueueConstants.MESSAGE_EXCHANGE_DL, QueueConstants.MESSAGE_ROUTE_KEY_DL, str);

      新添加一个消费者,同时将原来的消费者的监听队列换成新加入的

    package com.example.demo.consumer;
    
    import java.io.IOException;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import com.example.demo.constant.QueueConstants;
    import com.rabbitmq.client.Channel;
    
    @Component
    public class MessageConsumer {
        @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME_DL)
        public void processMessage(Channel channel, Message message) {
            System.out.println("MessageConsumer收到消息:" + new String(message.getBody()));
            try {
                //模拟消息处理失败
                int a = 3 / 0;
                // false只确认当前一个消息收到,true确认所有consumer获得的消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    
            } catch (Exception e) {
                if (message.getMessageProperties().getRedelivered()) {
    
                    System.out.println("消息已重复处理失败,拒绝再次接收...");
                    try {
                        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);//requeue为false,拒绝
                    } catch (IOException e1) {
                    }
    
                } else {
    
                    System.out.println("消息即将再次返回队列处理...");
    
                    try {
                        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // requeue为true重新回到队列
                    } catch (IOException e1) {
                    }
    
                }
            }
        }
    }
    package com.example.demo.consumer;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import com.example.demo.constant.QueueConstants;
    import com.rabbitmq.client.Channel;
    
    @Component
    public class MessageConsumer2 {
        @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME)
        public void processMessage(Channel channel,Message  message) {
            System.out.println("MessageConsumer2收到消息:"+new String(message.getBody()));
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                
            } 
        }
    }

      启动项目,发送请求http://localhost:8082/index?str=asdfgh,可以看到后台日志如下:

     

      rabbitmq支持四种交换器,同时还支持很多种插件,功能非常强大,这里我自己还没亲手用过,所以不再展开。

  • 相关阅读:
    C#开发中is和as的区别
    Winform开发框架之系统登录实现
    C#几个经常犯错误汇总
    JavaScript事件冒泡简介及应用
    在C#的winForm程序中调用和执行javascript
    C#关于托管程序和非托管程序的区别
    分布式计算 网格计算 并行计算 云计算
    (转)960的秘密
    集群概念:集群技术简介(转)
    好用的Sql格式化工具
  • 原文地址:https://www.cnblogs.com/hhhshct/p/9718063.html
Copyright © 2011-2022 走看看