zoukankan      html  css  js  c++  java
  • RabbitMQ消息中间件(第四章)第二部分

    本章导航

    • RabbitMQ整合Spring AMQP实战
    • RabbitMQ整合Spring Boot实战
    • RabbitMQ整合Spring Cloud实战

    RabbitMQ整合Spring AMQP实战

    • RabbitAdmin
    • SpringAMQP声明
    • SimpleMessageListenerContainer简单消息监听容器
    • MessageListenerAdapter消息监听适配器
    • MessageConverter 消息转换器,序列化和反序列化等操作
    • 注意:autoStartup必须要设置为true,否则Spring容器不会加载RabbitAdmin类
    • RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明
    • 然后使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作
    • 例如:添加一个交换机、删除一个绑定、清空一个队列里的消息等等

    MessageListenerAdapter

    MessageListenerAdapter即消息监听适配器

    • 通过messageListenerAdapter的代码我们可以看出如下核心属性
    • defaultListenerMethod默认监听方法名称:用于设置监听方法名称
    • Delegate委托对象:实际真实的委托对象,用于处理消息
    • queueOrTagToMethodName队列标识与方法名称组成的集合
    • 可以一一进行队列与方法名称的匹配
    • 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理

     代码实现

    package com.cx.temp.common.rabbitmq.spring;
    
    import com.cx.temp.common.rabbitmq.spring.adapter.MessageDelegate;
    import com.cx.temp.common.rabbitmq.spring.convert.TextMessageConverter;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
    import org.springframework.amqp.support.ConsumerTagStrategy;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.UUID;
    
    /**
     *
     */
    @Configuration
    @ComponentScan({"com.cx.temp.*"})
    public class RabbitMQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses("127.0.0.1:5672");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("/test001");
            return connectionFactory;
        }
    
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    
        /**
         * 针对消费者配置
         * 1.设置交换机类型
         * 2.将队列绑定到交换机
         * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
         * HeadersExchange: 通过添加属性key-value匹配
         * DirectExchange: 按照routingkey分发到指定队列
         * TopicExchange: 多关键字匹配
         * @return
         */
        @Bean
        public TopicExchange exchange001(){
            return new TopicExchange("topic001", true, false);
        }
    
        @Bean
        public Queue queue001() {
            return new Queue("queue001", true); //队列持久
        }
    
        @Bean
        public Binding binding001(){
            return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
        }
    
        @Bean
        public TopicExchange exchange002(){
            return new TopicExchange("topic002", true, false);
        }
    
        @Bean
        public Queue queue002() {
            return new Queue("queue002", true); //队列持久
        }
    
        @Bean
        public Queue queue003() {
            return new Queue("queue003", true); //队列持久
        }
    
        @Bean
        public Binding binding002(){
            return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
        }
    
        @Bean
        public Binding binding003(){
            return BindingBuilder.bind(queue003()).to(exchange002()).with("spring.*");
        }
    
        @Bean
        public Queue queue_image() {
            return new Queue("image_queue", true); //队列持久
        }
    
        @Bean
        public Queue queue_pdf() {
            return new Queue("pdf_queue", true); //队列持久
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
        @Bean
        public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
    
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
            container.setConcurrentConsumers(1);
            container.setMaxConcurrentConsumers(5);
            container.setDefaultRequeueRejected(false); //不进行重回队列
            container.setAcknowledgeMode(AcknowledgeMode.AUTO); //自动签收机制
            container.setConsumerTagStrategy(new ConsumerTagStrategy() {
                @Override
                public String createConsumerTag(String queue) {
                    return queue + "_" + UUID.randomUUID().toString();
                }
            });
    //        container.setMessageListener(new ChannelAwareMessageListener() {
    //            @Override
    //            public void onMessage(Message message, Channel channel) throws Exception {
    //                String msg = new String(message.getBody());
    //                System.err.println("-----------消费者:" + msg);
    //            }
    //        });
    
    
            //第一种
    //        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    //        container.setMessageListener(adapter);
    
            //第二种
    //        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    //        adapter.setDefaultListenerMethod("consumeMessage"); //也可以通过这个方法设置默认查找适配器对象的方法,默认的方法名是handleMessage
    
            //第三种
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
            adapter.setMessageConverter(new TextMessageConverter());
            container.setMessageListener(adapter);
    
    
    
            return container;
        }
    
    
    
    }

    字节转换器

    package com.cx.temp.common.rabbitmq.spring.convert;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import org.springframework.amqp.support.converter.MessageConverter;
    
    public class TextMessageConverter implements MessageConverter {
    
        @Override
        public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
            return new Message(o.toString().getBytes(), messageProperties);
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            String contentType = message.getMessageProperties().getContentType();
            if(null != contentType && contentType.contains("text")) {
                return new String(message.getBody());
            }
            return message.getBody();
        }
    
    }

    自定义输出设置

    package com.cx.temp.common.rabbitmq.spring.adapter;
    
    /**
     */
    public class MessageDelegate {
    
        //方法名是固定的,通过查看MessageListenerAdapter的源码,里面有句【public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";】
        //表示他是通过反射查找handleMessage这个方法名进行处理的
        //第一种 旧版的RabbitMQ默认是字节数据
    //    public void handleMessage(byte[] messageBody) {
    //        System.out.println("默认方法,消息内容:" + new String(messageBody));
    //    }
    
        //第一种 现演示的版本默认走字符串
    //    public void handleMessage(String messageBody) {
    //        System.out.println("默认方法,消息内容:" + messageBody);
    //    }
    
        //第二种
        public void consumeMessage(byte[] messageBody) {
            System.out.println("字节数组方法,消息内容:" + new String(messageBody));
        }
    //
    //    public void consumeMessage(String messageBody) {
    //        System.out.println("字符串方法,消息内容:" + messageBody);
    //    }
    //
    //    public void method1(String messageBody) {
    //
    //    }
    
    
    }

    测试类

    package com.cx.temp.rabbitmq;
    
    import com.cx.temp.admin.AdminApplication;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import java.util.HashMap;
    
    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringBootTest(classes = AdminApplication.class)
    public class RabbitMQTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
    
        @Test
        public void testAdmin() throws Exception {
    
            //第一种声明与绑定方式
            rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
    
            rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
    
            rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
    
            rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
    
            rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
    
            rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
    
            rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE,
                    "test.direct", "direct", new HashMap<>()));
    
            //第二种 支持链式声明与绑定
            rabbitAdmin.declareBinding(BindingBuilder
                    .bind(new Queue("test.topic.queue", false))
                    .to(new TopicExchange("test.topic", false, false))
                    .with("user.#"));
    
            rabbitAdmin.declareBinding(BindingBuilder
                    .bind(new Queue("test.fanout.queue", false))
                    .to(new FanoutExchange("test.fanout", false, false)));
    
    
            //清空队列数据
            rabbitAdmin.purgeQueue("test.topic.queue", false);
    
        }
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSendMessage() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.getHeaders().put("desc", "信息描述..");
            messageProperties.getHeaders().put("type", "自定义消息类型..");
            Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
    
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
                //消息发送之后在对这个message进行设置
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    System.err.println("----------添加额外的设置-----------");
                    message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                    message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                    return message;
                }
            });
    
        }
    
        @Test
        public void testSendMessage2() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("text/plain");
            Message message = new Message("mq 消息1234".getBytes(), messageProperties);
    
            rabbitTemplate.send("topic001", "spring.abc",message);
    
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
            rabbitTemplate.convertAndSend("topic002", "rabbit.amqp", "hello object message send!");
    
        }
    
        @Test
        public void testSendMessage4Text() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("text/plain");
            Message message = new Message("mq 消息1234".getBytes(), messageProperties);
    
            rabbitTemplate.send("topic001", "spring.abc",message);
        }
    
    
    }

    执行测试类

    testSendMessage4Text方法

    • queueOrTagToMethodName队列标识与方法名称组成的集合
    package com.cx.temp.common.rabbitmq.spring;
    
    import com.cx.temp.common.rabbitmq.spring.adapter.MessageDelegate;
    import com.cx.temp.common.rabbitmq.spring.convert.TextMessageConverter;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
    import org.springframework.amqp.support.ConsumerTagStrategy;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    /**
     *
     */
    @Configuration
    @ComponentScan({"com.cx.temp.*"})
    public class RabbitMQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses("127.0.0.1:5672");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("/test001");
            return connectionFactory;
        }
    
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    
        /**
         * 针对消费者配置
         * 1.设置交换机类型
         * 2.将队列绑定到交换机
         * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
         * HeadersExchange: 通过添加属性key-value匹配
         * DirectExchange: 按照routingkey分发到指定队列
         * TopicExchange: 多关键字匹配
         * @return
         */
        @Bean
        public TopicExchange exchange001(){
            return new TopicExchange("topic001", true, false);
        }
    
        @Bean
        public Queue queue001() {
            return new Queue("queue001", true); //队列持久
        }
    
        @Bean
        public Binding binding001(){
            return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
        }
    
        @Bean
        public TopicExchange exchange002(){
            return new TopicExchange("topic002", true, false);
        }
    
        @Bean
        public Queue queue002() {
            return new Queue("queue002", true); //队列持久
        }
    
        @Bean
        public Queue queue003() {
            return new Queue("queue003", true); //队列持久
        }
    
        @Bean
        public Binding binding002(){
            return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
        }
    
        @Bean
        public Binding binding003(){
            return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*");
        }
    
        @Bean
        public Queue queue_image() {
            return new Queue("image_queue", true); //队列持久
        }
    
        @Bean
        public Queue queue_pdf() {
            return new Queue("pdf_queue", true); //队列持久
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
        @Bean
        public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
    
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
            container.setConcurrentConsumers(1);
            container.setMaxConcurrentConsumers(5);
            container.setDefaultRequeueRejected(false); //不进行重回队列
            container.setAcknowledgeMode(AcknowledgeMode.AUTO); //自动签收机制
            container.setConsumerTagStrategy(new ConsumerTagStrategy() {
                @Override
                public String createConsumerTag(String queue) {
                    return queue + "_" + UUID.randomUUID().toString();
                }
            });
    //        container.setMessageListener(new ChannelAwareMessageListener() {
    //            @Override
    //            public void onMessage(Message message, Channel channel) throws Exception {
    //                String msg = new String(message.getBody());
    //                System.err.println("-----------消费者:" + msg);
    //            }
    //        });
    
    
            //第一种
    //        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    //        container.setMessageListener(adapter);
    
            //第二种
    //        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    //        adapter.setDefaultListenerMethod("consumeMessage"); //也可以通过这个方法设置默认查找适配器对象的方法,默认的方法名是handleMessage
    
            //第三种
            /** 1.适配器方式,默认是有自己的方法名称的:handleMessage
            //可以自己指定一个方法的名字:consumeMessage
            //也可以添加一个转换器:从字节数组转换String
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
            adapter.setMessageConverter(new TextMessageConverter());
            container.setMessageListener(adapter);
            */
    
            /**
             * 2.适配器方式:我们的队列名称和方法名称也可以也进行一一匹配
             */
            Map<String, String> queueOrTagToMethodName = new HashMap<>();
            queueOrTagToMethodName.put("queue001", "method1");
            queueOrTagToMethodName.put("queue002", "method2");
    
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
            adapter.setMessageConverter(new TextMessageConverter());
            container.setMessageListener(adapter);
    
    
            return container;
        }
    
    
    
    }
    package com.cx.temp.common.rabbitmq.spring.convert;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import org.springframework.amqp.support.converter.MessageConverter;
    
    public class TextMessageConverter implements MessageConverter {
    
        @Override
        public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
            return new Message(o.toString().getBytes(), messageProperties);
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            String contentType = message.getMessageProperties().getContentType();
            if(null != contentType && contentType.contains("text")) {
                return new String(message.getBody());
            }
            return message.getBody();
        }
    
    }
    package com.cx.temp.common.rabbitmq.spring.adapter;
    
    /**
     */
    public class MessageDelegate {
    
        //方法名是固定的,通过查看MessageListenerAdapter的源码,里面有句【public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";】
        //表示他是通过反射查找handleMessage这个方法名进行处理的
        //第一种 旧版的RabbitMQ默认是字节数据
    //    public void handleMessage(byte[] messageBody) {
    //        System.out.println("默认方法,消息内容:" + new String(messageBody));
    //    }
    
        //第一种 现演示的版本默认走字符串
    //    public void handleMessage(String messageBody) {
    //        System.out.println("默认方法,消息内容:" + messageBody);
    //    }
    
        //第二种 旧版
    //    public void consumeMessage(byte[] messageBody) {
    //        System.out.println("字节数组方法,消息内容:" + new String(messageBody));
    //    }
        //第二种 新版
        public void consumeMessage(String messageBody) {
            System.out.println("字符串方法,消息内容:" + messageBody);
        }
    
        public void method1(String messageBody) {
            System.out.println("method1收到消息内容:" + messageBody);
        }
    
        public void method2(String messageBody) {
            System.out.println("method2收到消息内容:" + messageBody);
        }
    
    
    }
    package com.cx.temp.rabbitmq;
    
    import com.cx.temp.admin.AdminApplication;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import java.util.HashMap;
    
    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringBootTest(classes = AdminApplication.class)
    public class RabbitMQTest {
    
        @Autowired
        private RabbitAdmin rabbitAdmin;
    
        @Test
        public void testAdmin() throws Exception {
    
            //第一种声明与绑定方式
            rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
    
            rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
    
            rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
    
            rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
    
            rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
    
            rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
    
            rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE,
                    "test.direct", "direct", new HashMap<>()));
    
            //第二种 支持链式声明与绑定
            rabbitAdmin.declareBinding(BindingBuilder
                    .bind(new Queue("test.topic.queue", false))
                    .to(new TopicExchange("test.topic", false, false))
                    .with("user.#"));
    
            rabbitAdmin.declareBinding(BindingBuilder
                    .bind(new Queue("test.fanout.queue", false))
                    .to(new FanoutExchange("test.fanout", false, false)));
    
    
            //清空队列数据
            rabbitAdmin.purgeQueue("test.topic.queue", false);
    
        }
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSendMessage() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.getHeaders().put("desc", "信息描述..");
            messageProperties.getHeaders().put("type", "自定义消息类型..");
            Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
    
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
                //消息发送之后在对这个message进行设置
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    System.err.println("----------添加额外的设置-----------");
                    message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                    message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                    return message;
                }
            });
    
        }
    
        @Test
        public void testSendMessage2() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("text/plain");
            Message message = new Message("mq 消息1234".getBytes(), messageProperties);
    
            rabbitTemplate.send("topic001", "spring.abc",message);
    
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
            rabbitTemplate.convertAndSend("topic002", "rabbit.amqp", "hello object message send!");
    
        }
    
        @Test
        public void testSendMessage4Text() throws Exception {
            //1 创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("text/plain");
            Message message = new Message("mq 消息1234".getBytes(), messageProperties);
    
            rabbitTemplate.send("topic001", "spring.abc",message);
            rabbitTemplate.send("topic002", "rabbit.abc",message);
        }
    
    
    }

    启动Appliciton后执行

    testSendMessage4Text控制台输出

  • 相关阅读:
    gRPC初识
    Go操作MySQL
    Go语言操作Redis
    Markdown 教程
    Go操作MongoDB
    Go操作NSQ
    Go操作kafka
    Go操作etcd
    Go语言获取系统性能数据gopsutil库
    influxDB
  • 原文地址:https://www.cnblogs.com/huihui-hui/p/14397118.html
Copyright © 2011-2022 走看看