zoukankan      html  css  js  c++  java
  • springboot整合RabbitMQ

    RabbitMQ整合 SpringCloud实战

    注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效

    生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等

    消费端核心配置

    u 首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理

    u 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况

    @RabbitListener注解的使用

    消费端监听@RabbitListener注解,这个对于在实际工作中非常的好用

    u @RabbitListener是一个组合注解,里面可以注解配置(@QueueBinding@Queue@Exchange)直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等

    注:由于类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置

    相关代码

    rabbitmq-common子项目

    package com.huang.rabbitmqcommon.entity;
    
    import java.io.Serializable;
    
    /**
     * @auther 宇晨
     * @site www.cnblogs.com/bf6rc9qu/
     * @company
     * @create 2019-12-22-22:30
     */
    
    public class Order implements Serializable {
    
        private String id;
        private String name;
    
        public Order() {
        }
        public Order(String id, String name) {
            super();
            this.id = id;
            this.name = name;
        }
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
    
    
    }

    rabbitmq-springcloud-consumer子项目

    Pom依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.huang</groupId>
        <artifactId>rabbitmq-springcloud-consumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>rabbitmq-springcloud-consumer</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.huang</groupId>
                <artifactId>rabbitmq-common</artifactId>
                <version>0.0.1-SNAPSHOT</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>

    Yml配置

    spring.rabbitmq.addresses=192.168.188.131:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    server.port=88
    server.servlet.context-path=/
    
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.concurrency=5
    spring.rabbitmq.listener.simple.max-concurrency=10
    
    spring.rabbitmq.listener.order.queue.name=queue-2
    spring.rabbitmq.listener.order.queue.durable=true
    spring.rabbitmq.listener.order.exchange.name=exchange-2
    spring.rabbitmq.listener.order.exchange.durable=true
    spring.rabbitmq.listener.order.exchange.type=topic
    spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
    spring.rabbitmq.listener.order.key=springboot.*

    RabbitReceiver

    package com.huang.rabbitmqspringcloudconsumer.conusmer;
    
    import com.huang.rabbitmqcommon.entity.Order;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    @Component
    public class RabbitReceiver {
    
       
       @RabbitListener(bindings = @QueueBinding(
             value = @Queue(value = "queue-1", 
             durable="true"),
             exchange = @Exchange(value = "exchange-1", 
             durable="true", 
             type= "topic", 
             ignoreDeclarationExceptions = "true"),
             key = "springboot.*"
             )
       )
       @RabbitHandler
       public void onMessage(Message message, Channel channel) throws Exception {
          System.err.println("--------------------------------------");
          System.err.println("消费端P bayload: " + message.getPayload());
          Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
          //手工ACK
          channel.basicAck(deliveryTag, false);
       }
       
       
       /**
        * 
        *     spring.rabbitmq.listener.order.queue.name=queue-2
          spring.rabbitmq.listener.order.queue.durable=true
          spring.rabbitmq.listener.order.exchange.name=exchange-1
          spring.rabbitmq.listener.order.exchange.durable=true
          spring.rabbitmq.listener.order.exchange.type=topic
          spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
          spring.rabbitmq.listener.order.key=springboot.*
        * @param order
        * @param channel
        * @param headers
        * @throws Exception
        */
       @RabbitListener(bindings = @QueueBinding(
             value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", 
             durable="${spring.rabbitmq.listener.order.queue.durable}"),
             exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", 
             durable="${spring.rabbitmq.listener.order.exchange.durable}", 
             type= "${spring.rabbitmq.listener.order.exchange.type}", 
             ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
             key = "${spring.rabbitmq.listener.order.key}"
             )
       )
       @RabbitHandler
       public void onOrderMessage(@Payload Order order,
             Channel channel, 
             @Headers Map<String, Object> headers) throws Exception {
          System.err.println("--------------------------------------");
          System.err.println("消费端order: " + order.getId());
          Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
          //手工ACK
          channel.basicAck(deliveryTag, false);
       }
       
       
    }

    MainConfig.java

    package com.huang.rabbitmqspringcloudconsumer;
    
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @ComponentScan({"com.huang.rabbitmqspringcloudconsumer.*"})
    public class MainConfig {
    
    }

    rabbitmq-springcloud-producer子项目

    Pom依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <artifactId>rabbitmq-springcloud-producer</artifactId>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>com.huang</groupId>
                <artifactId>rabbitmq-common</artifactId>
                <version>0.0.1-SNAPSHOT</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>

    Yml配置

    spring.rabbitmq.addresses=192.168.188.131:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true

    RabbitSender.java

    package com.huang.rabbitmqspringcloudproducer.producer;
    
    import com.huang.rabbitmqcommon.entity.Order;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    
    @Component
    public class RabbitSender {
    
       //自动注入RabbitTemplate模板类
       @Autowired
       private RabbitTemplate rabbitTemplate;  
       
       //回调函数: confirm确认
       final ConfirmCallback confirmCallback = new ConfirmCallback() {
          @Override
          public void confirm(CorrelationData correlationData, boolean ack, String cause) {
             System.err.println("correlationData: " + correlationData);
             System.err.println("ack: " + ack);
             if(!ack){
                System.err.println("异常处理....");
             }
          }
       };
       
       //回调函数: return返回
       final ReturnCallback returnCallback = new ReturnCallback() {
          @Override
          public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                String exchange, String routingKey) {
             System.err.println("return exchange: " + exchange + ", routingKey: " 
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
          }
       };
       
       //发送消息方法调用: 构建Message消息
       public void send(Object message, Map<String, Object> properties) throws Exception {
          MessageHeaders mhs = new MessageHeaders(properties);
          Message msg = MessageBuilder.createMessage(message, mhs);
          rabbitTemplate.setConfirmCallback(confirmCallback);
          rabbitTemplate.setReturnCallback(returnCallback);
          //id + 时间戳 全局唯一 
          CorrelationData correlationData = new CorrelationData("1234567890");
          rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
       }
       
       //发送消息方法调用: 构建自定义对象消息
       public void sendOrder(Order order) throws Exception {
          rabbitTemplate.setConfirmCallback(confirmCallback);
          rabbitTemplate.setReturnCallback(returnCallback);
          //id + 时间戳 全局唯一 
          CorrelationData correlationData = new CorrelationData("0987654321");
          rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
       }
       
    }

    MainConfig.java

    package com.huang.rabbitmqspringcloudproducer;
    
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @ComponentScan({"com.huang.rabbitmqspringcloudproducer.*"})
    public class MainConfig {
    
    

    测试代码

    package com.huang.rabbitmqspringcloudproducer;
    
    import com.huang.rabbitmqcommon.entity.Order;
    import com.huang.rabbitmqspringcloudproducer.producer.RabbitSender;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitmqSpringcloudProducerApplicationTests {
    
        @Autowired
        private RabbitSender rabbitSender;
    
        private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    
        @Test
        public void testSender1() throws Exception {
            Map<String, Object> properties = new HashMap<>();
            properties.put("number", "12345");
            properties.put("send_time", simpleDateFormat.format(new Date()));
            rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
        }
    
        @Test
        public void testSender2() throws Exception {
            Order order = new Order("001", "第一个订单");
            rabbitSender.sendOrder(order);
        }
    
    }

  • 相关阅读:
    HDU Railroad (记忆化)
    HDU 1227 Fast Food
    HDU 3008 Warcraft
    asp vbscript 检测客户端浏览器和操作系统(也可以易于升级到ASP.NET)
    Csharp 讀取大文本文件數據到DataTable中,大批量插入到數據庫中
    csharp 在万年历中计算显示农历日子出错
    csharp create ICS file extension
    CSS DIV Shadow
    DataTable search keyword
    User select fontface/color/size/backgroundColor设置 字体,颜色,大小,背景色兼容主流浏览器
  • 原文地址:https://www.cnblogs.com/bf6rc9qu/p/12081708.html
Copyright © 2011-2022 走看看