zoukankan      html  css  js  c++  java
  • RabbitMQ交换机和spring整合RabbitMQ

    1、交换机

    2、RabbitMQ整合springCloud

    交换机

    交换机属性:

    Name:交换机名称
    Type:交换机类型 direct、topic、fanout、headers
    Durability:是否需要持久化,true为持久化
    Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
    Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False
    Arguments:扩展参数,用于扩展AMQP协议,定制化使用

    直流交换机
    直连交换机Direct Exchange(完全匹配路由key)

    所有发送到Direct Exchange的消息会被转发到RouteKey中指定的Queue

    【注意】Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃;

     

     消费端代码

    package com.cgl.rabbitmqapi.exchange.direct;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    /**
     * @author 胡明财
     * @company
     * @create  2019-12-21 9:16
     */
    public class Consumer4DirectExchange {
        public static void main(String[] args) throws Exception {
    
    
            ConnectionFactory connectionFactory = new ConnectionFactory() ;
    
            connectionFactory.setHost("192.168.125.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
    
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_direct_exchange";
            String exchangeType = "direct";
            String queueName = "test_direct_queue";
            String routingKey = "test.direct";
    
            //表示声明了一个交换机
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            //表示声明了一个队列
            channel.queueDeclare(queueName, false, false, false, null);
            //建立一个绑定关系:
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);
            //循环获取消息
            while(true){
                //获取消息,如果没有消息,这一步将会一直阻塞
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("收到消息:" + msg);
            }
        }
    }

    生产端代码

    package com.hmc.rabbitmqapi.exchange.direct;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * @author 胡明财
     * @company
     * @create  2019-12-21 9:17
     */
    public class Producer4DirectExchange {
        public static void main(String[] args) throws Exception {
    
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.125.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_direct_exchange";
            String routingKey = "test.direct";
    //        String routingKey = "test.direct111"; //收不到
            //5 发送
    
            String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
            channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    
        }
    }

    主题交换机
    主题交换机Topic Exchange(匹配路由规则的交换机)

    所有发送到Topic Exchange的消息被转发到所有关系RouteKey中指定Topic的Queue上;

    Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic;

    【注意】可以使用通配符进行模糊匹配

    符号:“#” 匹配一个或者多个词
    符号:“”匹配不多不少一个词
    列如:
    “log.#” 能够匹配到 “log.info.oa”
    “log.” 能够匹配到 “log.err”


    消费端代码

    package com.hmc.rabbitmqapi.exchange.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    /**
     * @author 胡明财
     * @company
     * @create  2019-12-21 10:06
     */
    public class Consumer4TopicExchange {
        public static void main(String[] args) throws Exception {
    
    
            ConnectionFactory connectionFactory = new ConnectionFactory() ;
    
            connectionFactory.setHost("192.168.125.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
    
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue";
            String routingKey = "user.#";
    //        String routingKey = "user.*";
            // 1 声明交换机
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 2 声明队列
            channel.queueDeclare(queueName, false, false, false, null);
            // 3 建立交换机和队列的绑定关系:
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);
            //循环获取消息
            while(true){
                //获取消息,如果没有消息,这一步将会一直阻塞
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("收到消息:" + msg);
            }
        }
    }

    生产端代码

    package com.hmc.rabbitmqapi.exchange.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * @author 胡明财
     * @company
     * @create  2019-12-21 10:08
     */
    public class Producer4TopicExchange {
        public static void main(String[] args) throws Exception {
    
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.125.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_topic_exchange";
            String routingKey1 = "user.save";
            String routingKey2 = "user.update";
            String routingKey3 = "user.delete.abc";
            //5 发送
    
            String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
            channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
            channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
            channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
            channel.close();
            connection.close();
        }
    }

    输出交换机

    输出交换机Fanout Exchange(不做路由)

    不处理路由键,只需要简单的将队列绑定到交换机上;
    发送到交换机的消息都会被转发到与该交换机绑定的所有队列上;
    Fanout交换机转发消息是最快的

     消费端代码

    package com.hmc.rabbitmqapi.exchange.fanout;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    /**
     * @author 胡明财
     * @company
     * @create  2019-12-21 10:21
     */
    public class Consumer4FanoutExchange {
        public static void main(String[] args) throws Exception {
    
            ConnectionFactory connectionFactory = new ConnectionFactory() ;
    
            connectionFactory.setHost("192.168.125.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
    
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue";
            String routingKey = "";    //不设置路由键
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);
            //循环获取消息
            while(true){
                //获取消息,如果没有消息,这一步将会一直阻塞
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("收到消息:" + msg);
            }
        }
    }

    生产端代码

    package com.hmc.rabbitmqapi.exchange.fanout;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * @author 胡明财
     * @company
     * @create  2019-12-21 10:21
     */
    public class Producer4FanoutExchange {
        public static void main(String[] args) throws Exception {
    
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.125.128");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_fanout_exchange";
            //5 发送
            for(int i = 0; i < 10; i ++) {
                String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
                channel.basicPublish(exchangeName, "", null , msg.getBytes());
            }
            channel.close();
            connection.close();
        }
    }

     

    Binding-绑定

    Exchange和Exchange、Queue之间的连接关系;

    Binding中可以包含RoutingKey或者参数

    Queue-消息队列

    消息队列,实际存储消息数据

    Durability:是否持久化

    Durable:是,Transient:否

    Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除

    Message-消息

    服务器和应用程序之间传递的数据

    本质上就是一段数据,由Properties和Payload(Body)组成

    常用属性:delivery model、headers(自定义属性)

    Message-其他属性

    content_type、content_encoding、priority

    correlation_id、reply_to、expiration、message_id

    Timestamp、type、user_id、app_id、cluster_id

    Virtual host-虚拟主机

    虚拟地址,用于进行逻辑隔离,最上层的消息路由

    一个Virtual Host里面可以有若干个Exchange和Queue

    同一个Virtual Host里面不能有相同名称的Exchange或Queue

    小结:

    RabbitMQ的概念、安装与使用、管控台操作;

    结合RabbitMQ的特性、Exchange、Queue、Binding、RoutingKey、Message进行核心API的讲解

    RabbitMQ整合 SpringCloud实战

    注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效

    生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等

    消费端核心配置

    u 首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理

    u 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况

    @RabbitListener注解的使用

    消费端监听@RabbitListener注解,这个对于在实际工作中非常的好用

    u @RabbitListener是一个组合注解,里面可以注解配置(@QueueBinding、@Queue、@Exchange)直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等

    相关代码

    rabbitmq-common子项目

    package com.hmc.rabbitmqcommon.entity;
    
    import java.io.Serializable;
    
    /**
     * @author 胡明财
     * @site www.xiaomage.com
     * @company
     * @create  2019-12-21 10:48
     */
    public class Order implements Serializable {
    
        private String id;
        private String name;
    
        public Order() {
        }
        public Order(String id, String name) {
            super();
            this.id = id;
            this.name = name;
        }
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
    
    }

    创建一个子项目:rabbitmq-springcloud-consumer

    Pom依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.hmc</groupId>
        <artifactId>rabbitmq-springcloud-consumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>rabbitmq-springcloud-consumer</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.cgl</groupId>
                <artifactId>rabbitmq-common</artifactId>
                <version>0.0.1-SNAPSHOT</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>

    Yml配置

    spring.rabbitmq.addresses=192.168.197.134:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    server.port=80
    server.servlet.context-path=/
    
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.concurrency=5
    spring.rabbitmq.listener.simple.max-concurrency=10
    
    spring.rabbitmq.listener.order.queue.name=queue-2
    spring.rabbitmq.listener.order.queue.durable=true
    spring.rabbitmq.listener.order.exchange.name=exchange-2
    spring.rabbitmq.listener.order.exchange.durable=true
    spring.rabbitmq.listener.order.exchange.type=topic
    spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
    spring.rabbitmq.listener.order.key=springboot.*

    RabbitReceiver

    package com.hmc.rabbitmqspringcloudconsumer.conusmer;
    
    import com.cgl.rabbitmqcommon.entity.Order;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    /**
     * @author 胡明财
     * @company
     * @create  2019-12-21 11:08
     */
    @Component
    public class RabbitReceiver {
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "queue-1",
                        durable="true"),
                exchange = @Exchange(value = "exchange-1",
                        durable="true",
                        type= "topic",
                        ignoreDeclarationExceptions = "true"),
                key = "springboot.*"
        )
        )
        @RabbitHandler
        public void onMessage(Message message, Channel channel) throws Exception {
            System.err.println("--------------------------------------");
            System.err.println("消费端Payload: " + message.getPayload());
            Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            //手工ACK
            channel.basicAck(deliveryTag, false);
        }
    
    
        /**
         *
         *     spring.rabbitmq.listener.order.queue.name=queue-2
         spring.rabbitmq.listener.order.queue.durable=true
         spring.rabbitmq.listener.order.exchange.name=exchange-1
         spring.rabbitmq.listener.order.exchange.durable=true
         spring.rabbitmq.listener.order.exchange.type=topic
         spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
         spring.rabbitmq.listener.order.key=springboot.*
         * @param order
         * @param channel
         * @param headers
         * @throws Exception
         */
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
                        durable="${spring.rabbitmq.listener.order.queue.durable}"),
                exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
                        durable="${spring.rabbitmq.listener.order.exchange.durable}",
                        type= "${spring.rabbitmq.listener.order.exchange.type}",
                        ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
                key = "${spring.rabbitmq.listener.order.key}"
        )
        )
        @RabbitHandler
        public void onOrderMessage(@Payload Order order,
                                   Channel channel,
                                   @Headers Map<String, Object> headers) throws Exception {
            System.err.println("--------------------------------------");
            System.err.println("消费端order: " + order.getId());
            Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            //手工ACK
            channel.basicAck(deliveryTag, false);
        }
    
    
    }

    MainConfig.java

    package com.cgl.rabbitmqspringcloudconsumer;
    
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author caoguangli
     * @company
     * @create  2019-12-21 11:18
     */
    
    @Configuration
    @ComponentScan({"com.cgl.rabbitmqspringcloudconsumer.*"})
    public class MainConfig {
    }

    创建项目 :rabbitmq-springcloud-producer

    Pom依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.cgl</groupId>
        <artifactId>rabbitmq-springcloud-producer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>rabbitmq-springcloud-producer</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>com.cgl</groupId>
                <artifactId>rabbitmq-common</artifactId>
                <version>0.0.1-SNAPSHOT</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>

    Yml配置

    spring.rabbitmq.addresses=192.168.197.134:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true

    RabbitSender.java

    package com.hmc.rabbitmqspringcloudproducer.producer;
    
    import com.hmc.rabbitmqcommon.entity.Order;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    /**
     * @author 胡明财
     * @company
     * @create  2019-12-21 11:25
     */
    @Component
    public class RabbitSender {
        //自动注入RabbitTemplate模板类
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //回调函数: confirm确认
        final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.err.println("correlationData: " + correlationData);
                System.err.println("ack: " + ack);
                if(!ack){
                    System.err.println("异常处理....");
                }
            }
        };
    
        //回调函数: return返回
        final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                                        String exchange, String routingKey) {
                System.err.println("return exchange: " + exchange + ", routingKey: "
                        + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
            }
        };
    
        //发送消息方法调用: 构建Message消息
        public void send(Object message, Map<String, Object> properties) throws Exception {
            MessageHeaders mhs = new MessageHeaders(properties);
            Message msg = MessageBuilder.createMessage(message, mhs);
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            //id + 时间戳 全局唯一
            CorrelationData correlationData = new CorrelationData("1234567890");
            rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
        }
    
        //发送消息方法调用: 构建自定义对象消息
        public void sendOrder(Order order) throws Exception {
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            //id + 时间戳 全局唯一
            CorrelationData correlationData = new CorrelationData("0987654321");
            rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
        }
    }

    MainConfig.java

    package com.hmc.rabbitmqspringcloudproducer;
    
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author 胡明财
     * @company
     * @create  2019-12-21 11:28
     */
    
    @Configuration
    @ComponentScan({"com.cgl.rabbitmqspringcloudproducer.*"})
    public class MainConfig {
    }

    测试代码

    package com.hmc.rabbitmqspringcloudproducer;
    
    import com.hmc.rabbitmqcommon.entity.Order;
    import com.cgl.rabbitmqspringcloudproducer.producer.RabbitSender;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitmqSpringcloudProducerApplicationTests {
    
        @Autowired
        private RabbitSender rabbitSender;
    
        private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    
        @Test
        public void testSender1() throws Exception {
            Map<String, Object> properties = new HashMap<>();
            properties.put("number", "12345");
            properties.put("send_time", simpleDateFormat.format(new Date()));
            rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
        }
    
        @Test
        public void testSender2() throws Exception {
            Order order = new Order("001", "第一个订单");
            rabbitSender.sendOrder(order);
        }
    
    }

     

     

  • 相关阅读:
    Sprinig.net 双向绑定 Bidirectional data binding and data model management 和 UpdatePanel
    Memcached是什么
    Spring.net 网络示例 codeproject
    jquery.modalbox.show 插件
    UVA 639 Don't Get Rooked
    UVA 539 The Settlers of Catan
    UVA 301 Transportation
    UVA 331 Mapping the Swaps
    UVA 216 Getting in Line
    UVA 10344 23 out of 5
  • 原文地址:https://www.cnblogs.com/xmf3628/p/12081857.html
Copyright © 2011-2022 走看看