zoukankan      html  css  js  c++  java
  • springboot 2.X 集成RabbitMQ 详解(一) direct + fanout

    direct 模式
    路由键完全匹配,消息被投递到对应的队列, direct 交换器是默认交换器。也就是传统的点对点消息发送。
    fanout 模式
    消息广播到绑定的队列,不管队列绑定了什么路由键,消息经过交换器,每个队列都有一份。此模式只和交换器有关联。
    topic 模式
    通过使用""和"#"通配符进行处理,使来自不同源头的消息到达同一个队列,"."将路由键分为了几个标识符,""匹配 1 个,"#"匹配一个或多个。

    • 项目结构(包含springboot的大部分第三方框架整合,持续更新中。。。)

    • 引入父模块pom文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.monco</groupId>
        <artifactId>boot_frame</artifactId>
        <packaging>pom</packaging>
        <version>1.0-SNAPSHOT</version>
    
        <description> springboot 框架基础集成</description>
        <modules>
            <module>boot_security</module>
            <module>boot_redis</module>
            <module>boot_common</module>
            <module>boot_core</module>
            <module>boot_activemq</module>
            <module>boot_rabbitmq</module>
            <module>boot_kafka</module>
            <module>boot_shiro</module>
            <module>boot_job</module>
            <module>boot_web</module>
        </modules>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.0.RELEASE</version>
            <relativePath/>
        </parent>
    
        <properties>
            <!-- 指定编码和jdk版本 -->
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
            <!-- 指定springboot starter 版本 -->
            <spring.boot.version>2.0.0.RELEASE</spring.boot.version>
            <mysql.connector.java.version>5.1.34</mysql.connector.java.version>
            <commons.lang3.version>3.4</commons.lang3.version>
            <druid.version>1.1.10</druid.version>
            <swagger2.version>2.7.0</swagger2.version>
            <joda.time.version>2.9.9</joda.time.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <!-- lombok集成-->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
    
            <!-- commons集成-->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>${commons.lang3.version}</version>
            </dependency>
        </dependencies>
    </project>
    
    • 引入RabbitMQ模块pom文件
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>boot_frame</artifactId>
            <groupId>com.monco</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>boot_rabbitmq</artifactId>
    
        <description>rabbit mq</description>
    
        <dependencies>
            <!-- rabbit mq 消息依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <!-- web 依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
        </dependencies>
    </project>
    
    • 引入application.properties
    # rabbitmq 服务器IP 
    spring.rabbitmq.host=IP
    # 默认端口号
    spring.rabbitmq.port=5672 
    spring.rabbitmq.username=monco
    spring.rabbitmq.password=123456
    # 设置发送方确认机制
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.virtual-host=/
    # 开启手动确认
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    # 服务端口号
    server.port=8888
    

    配置类 RabbitConfig :

    package com.monco.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author monco
     * @date 2020/4/13
     * @description: RabbitMq 配置类
     */
    @Slf4j
    @Configuration
    public class RabbitConfig {
    
        @Value("${spring.rabbitmq.host}")
        private String address;
    
        @Value("${spring.rabbitmq.port}")
        private String port;
    
        @Value("${spring.rabbitmq.username}")
        private String username;
    
        @Value("${spring.rabbitmq.password}")
        private String password;
    
        @Value("${spring.rabbitmq.virtual-host}")
        private String virtualHost;
    
        @Value("${spring.rabbitmq.publisher-confirms}")
        private boolean publisherConfirms;
    
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(address + ":" + port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(virtualHost);
            connectionFactory.setPublisherConfirms(publisherConfirms);
            return connectionFactory;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback(confirmCallback());
            rabbitTemplate.setReturnCallback(returnCallback());
            return rabbitTemplate;
        }
    
        //----------------------点对点 direct 模式------------------------------------------------
    
        /**
         * 查看 Queue 的构造方法  有五个参数
         * name 队列名
         * durable 是否是持久化队列(服务器重启不会消失)默认为true
         * exclusive 普通队列允许的消费者没有限制,多个消费者绑定到多个队列时,RabbitMQ 会采用轮询进行投递。如果需要消费者独占队列,在队列创建的时候,设定属性 exclusive 为 true。
         * autoDelete 是否自动删除
         * map 参数传递
         */
        @Bean
        public Queue helloQueue() {
            return new Queue("hello", true, true, true, new HashMap<>());
        }
    
        @Bean
        public Queue objectQueue() {
            return new Queue("object");
        }
    
    
        //----------------------fanout 模式------------------------------------------------
    
        /**
         * 声明 并 加载广播队列
         */
        @Bean
        public Queue fanoutQueue() {
            return new Queue("monco.fanout");
        }
    
        /**
         * 声明 并 加载广播交换器
         */
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(RmConst.EXCHANGE_FANOUT);
        }
    
        /**
         * 将 广播队列 和 广播交换器 绑定
         *
         * @param fanoutQueue    广播队列
         * @param fanoutExchange 广播交换器
         */
        @Bean
        Binding bindingExchange(Queue fanoutQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
        }
    
    
        //----------------------topic 模式  重中之重 ------------------------------------------------
    
        /**
         * 定义topic模式的交换器
         * exchange 交换器 名称  monco.topic.exchange
         * 队列名称  monco.topic.email  monco.topic.email
         * <p>
         * routing key 定义 monco.*.user
         */
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(RmConst.EXCHANGE_TOPIC);
        }
    
        @Bean
        public Queue queueEmailMessage() {
            return new Queue(RmConst.QUEUE_TOPIC_EMAIL);
        }
    
        @Bean
        public Queue queueUserMessage() {
            return new Queue(RmConst.QUEUE_TOPIC_USER);
        }
    
        @Bean
        public Queue queueAllMessage() {
            return new Queue(RmConst.QUEUE_TOPIC_All);
        }
    
        @Bean
        public Binding bindingEmailExchangeMessage() {
            return BindingBuilder
                    .bind(queueEmailMessage())
                    .to(topicExchange())
                    .with("monco.topic.email");
        }
    
        @Bean
        public Binding bindingUserExchangeMessages() {
            return BindingBuilder
                    .bind(queueUserMessage())
                    .to(topicExchange())
                    .with("monco.*.user");
        }
    
        @Bean
        public Binding bindingAllExchangeMessages() {
            return BindingBuilder
                    .bind(queueAllMessage())
                    .to(topicExchange())
                    .with("monco.*.user");
        }
    
    
        //----------------------死信队列 模式  重中之重 ------------------------------------------------
    
        /**
         * 创建一个常用的topic 交换器 用户接收 消息
         */
        @Bean
        public TopicExchange normalExchange() {
            return new TopicExchange("normal_exchange");
        }
    
        /**
         * 创建一个 dlx 死信交换器
         */
        @Bean
        public TopicExchange dlxExchange() {
            return new TopicExchange("dlx_exchange");
        }
    
        /**
         * 创建一个拒绝常用消息的队列
         */
        @Bean
        public Queue rejectQueue() {
            Map<String, Object> args = new HashMap<String, Object>();
            // 绑定死信交换器
            args.put("x-dead-letter-exchange", "dlx_exchange");
            // 绑定死信路由键
            args.put("x-dead-letter-routing-key", "dlx.message");
            // 设置队列失效时间 4000ms
            args.put("x-message-ttl", 4000);
            return new Queue("monco.reject.queue", true, false, false, args);
        }
    
        /**
         * 创建一个接收死信消息的队列
         */
        @Bean
        public Queue acceptQueue() {
            return new Queue("monco.accept.queue");
        }
    
        @Bean
        public Binding bingRejectQueue() {
            return BindingBuilder
                    .bind(rejectQueue())
                    .to(normalExchange())
                    .with("normal.#");
        }
    
        @Bean
        public Binding bingAcceptQueue() {
            return BindingBuilder
                    .bind(acceptQueue())
                    .to(dlxExchange())
                    .with("dlx.#");
        }
    
        /**
         * 生产者发送确认
         */
        @Bean
        public RabbitTemplate.ConfirmCallback confirmCallback() {
            return new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    if (ack) {
                        log.info("发送者发送mq成功");
                    } else {
                        log.error("发送者发送mq失败,原因如下:" + cause);
                    }
                }
            };
        }
    
        /**
         * 发送者失败通知  主要是 路由失败
         */
        @Bean
        public RabbitTemplate.ReturnCallback returnCallback() {
            return new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.error("发送者路由失败,请检查路由");
                    log.error("Returned replyCode" + replyCode);
                    log.error("Returned replyText" + replyText);
                    log.error("Returned routingKey" + routingKey);
                    String msgJson = new String(message.getBody());
                    log.error("Returned message " + msgJson);
                }
            };
        }
    }
    
    • 常量类 RmConst
    package com.monco.config;
    
    /**
     * @author monco
     * @date 2020/4/14
     * @description: 交换器 队列 等 常量设置
     */
    public class RmConst {
    
        public final static String QUEUE_TOPIC_EMAIL = "monco.topic.email";
        public final static String QUEUE_TOPIC_USER = "monco.topic.user";
        public final static String QUEUE_TOPIC_All = "monco.monco.user";
    
        public final static String EXCHANGE_TOPIC = "monco.topic.exchange";
        public final static String EXCHANGE_FANOUT = "monco.fanout.exchange";
    
    }
    
    • direct 模式:
    • 生产者:
    package com.monco.sender;
    
    import com.monco.model.User;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * @author monco
     * @date 2020/4/13
     * @description: 消息发送方
     */
    @Component
    public class HelloSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String context = "hello " + new Date();
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("hello", context);
        }
    
        public void send(User user) {
            // User 需要事先 序列化 不进行序列化 会产生错误
            System.out.println("Sender object: " + user.toString());
            this.rabbitTemplate.convertAndSend("object", user);
        }
    }
    
    • 消费者:
    package com.monco.receiver;
    
    import com.monco.model.User;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @author monco
     * @date 2020/4/13
     * @description: 消息消费者
     */
    @Component
    public class HelloReceiver {
    
        @RabbitHandler
        @RabbitListener(queues = "hello")
        public void process(String hello) {
            System.out.println("Receiver  : " + hello);
        }
    
    
        @RabbitHandler
        @RabbitListener(queues = "object")
        public void process(User user) {
            System.out.println("Receiver object : " + user);
            System.out.println("username:"+user.getUsername());
            System.out.println("age:"+user.getAge());
        }
    }
    
    

    • fanout 模式: fanout 模式 生产者和消费者只和 exchange交换器有关 和 路由键无关 直接绑定队列即可
    • 生产者:
    package com.monco.sender;
    
    import com.monco.config.RmConst;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * @author monco
     * @date 2020/4/14
     * @description: 广播模式 发送者 交换器 monco.fanout.exchange
     */
    @Component
    public class FanoutSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(String msg) {
            String sendMsg = msg +"---"+ System.currentTimeMillis();;
            System.out.println("FanoutSender : " + sendMsg);
            this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_FANOUT, "",sendMsg);
        }
    }
    
    • 消费者:
    package com.monco.receiver;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @author monco
     * @date 2020/4/14
     * @description: 广播模式 发送者  接收队列为 monco.fanout 的
     */
    @Component
    public class FanoutReceiver {
    
        @RabbitHandler
        @RabbitListener(queues = "monco.fanout")
        public void process(String hello) {
            System.out.println("FanoutReceiver : " + hello);
        }
    }
    
    
    • 测试类:
    package com.monco;
    
    import com.monco.model.User;
    import com.monco.sender.DlxSender;
    import com.monco.sender.FanoutSender;
    import com.monco.sender.HelloSender;
    import com.monco.sender.TopicSender;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * @author monco
     * @date 2020/4/13
     * @description: 测试类
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqTest {
    
        @Autowired
        private HelloSender helloSender;
    
        @Autowired
        private FanoutSender fanoutSender;
    
        // 发送单条消息
        @Test
        public void contextLoads() {
            helloSender.send();
        }
    
        // 发送对象
        @Test
        public void sendObject() {
            User user = new User("monco", 18);
            helloSender.send(user);
        }
    
        @Test
        public void sendFanoutMessage() {
            fanoutSender.send("广播消息发送");
        }
    }
    
    

    这些是RabbitMQ两种基本的交换器的介绍,之后两篇分别介绍topic模式,手动应答模式,死信队列模式,因为这几个模块比较复杂,所以单独分析。

  • 相关阅读:
    C#_WinForm接收命令行参数
    SQLite不支持的SQL语法总结
    Thirft框架介绍
    jquery获取复选框的值
    REST构架风格介绍:状态表述转移
    RESTful HTTP的实践infoQ
    C#如何在webBrowser1控件通过TagName,Name查找元素(没有ID时)遍历窗体元素
    Html TO Ubb and Ubb TO Html
    SQL 位运算
    Memcached真的过时了吗?【转帖】
  • 原文地址:https://www.cnblogs.com/monco-sxy/p/12721777.html
Copyright © 2011-2022 走看看