zoukankan      html  css  js  c++  java
  • RabbitMQ 第五课 RabbitMQ整合Spring AMQP实战

    一、RabbitAdmin

     1.引入AMQP依赖

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    2. 配置类

    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class AMQPConfig {
        
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses("localhost");
            connectionFactory.setUsername("xiaochao");
            connectionFactory.setPassword("root");
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
        
        /** RabbitAdmin底层实现是从Spring容器中获取Exchange、Binding、RoutingKey以及Queue的@Bean声明 */
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            //此处必须设置为true,否则Spring容器不会加载RabbitAdmin类
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
        //然后使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作
        //例如:添加一个交换机、删除一个绑定、清空一个队列里的消息等等
    }

    3. 测试类,用Java代码:RabbitAdmin来创建Exchange、Queue、Binding

    import java.util.HashMap;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.Binding.DestinationType;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Exchange;
    import org.springframework.amqp.core.ExchangeBuilder;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class RabbitmqDemoApplicationTests {
        
        @Autowired
        private RabbitAdmin rabbitAdmin;
    
        @Test
        void contextLoads() {
            Exchange exchange1 = new ExchangeBuilder("test.direct.exchange", ExchangeTypes.DIRECT).build();
            Exchange exchange2 = new ExchangeBuilder("test.topic.exchange", ExchangeTypes.TOPIC).build();
            Exchange exchange3 = new ExchangeBuilder("test.fanout.exchange", ExchangeTypes.FANOUT).build();
            
            Queue queue1 = QueueBuilder.durable("test.direct.queue").autoDelete().expires(1500000).build();
            Queue queue2 = QueueBuilder.durable("test.topic.queue").autoDelete().expires(1500000).build();
            Queue queue3 = QueueBuilder.durable("test.fanout.queue").autoDelete().expires(1500000).build();
            
            Binding binding1 = new Binding("test.direct.queue", DestinationType.QUEUE, "test.direct.exchange", "directRoutingKey", new HashMap<>());
            Binding binding2 = BindingBuilder.bind(queue2).to(exchange2).with("topicRoutingKey.#").and(new HashMap<>());
            Binding binding3 = BindingBuilder.bind(queue3).to(exchange3).with("").noargs();//fanout不需要routingKey
            
            rabbitAdmin.declareExchange(exchange1);
            rabbitAdmin.declareExchange(exchange2);
            rabbitAdmin.declareExchange(exchange3);
            
            rabbitAdmin.declareQueue(queue1);
            rabbitAdmin.declareQueue(queue2);
            rabbitAdmin.declareQueue(queue3);
            
            rabbitAdmin.declareBinding(binding1);
            rabbitAdmin.declareBinding(binding2);
            rabbitAdmin.declareBinding(binding3);
            
            rabbitAdmin.purgeQueue("test.direct.queue", true);//清空队列
            
        }
    
    }

     4. 以配置类的形式创建Exchange、Queue、Binding

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ExchangeQueueBindingConfig {
        
        /**
         * 针对消费者配置
         * 1.设置交换机类型
         * 2.将队列绑定到交换机
         *         FanoutExchange:将消息分发到所有的绑定队列,无routingKey的概念
         *         HeaderExchange:通过添加属性key-value匹配
         *         DirectExchange:按照routingKey分发到指定队列
         *         TopicExchange:多关键字匹配
         */
        //交换机1、队列1、绑定1
        @Bean
        public TopicExchange exchange01() {
            return new TopicExchange("topicExchange01",true,false);
        }
        @Bean
        public Queue queue01() {
            return new Queue("queue01", true);
        }
        @Bean
        public Binding bingding01() {
            return BindingBuilder.bind(queue01()).to(exchange01()).with("spring.*");
        }
        
        //交换机2、队列2、绑定2
        @Bean
        public TopicExchange exchange02() {
            return new TopicExchange("topicExchange02",true,false);
        }
        @Bean
        public Queue queue02() {
            return new Queue("queue02", true);
        }
        @Bean
        public Binding bingding02() {
            return BindingBuilder.bind(queue02()).to(exchange02()).with("rabbit.*");
        }
        
        //交换机1、队列3、绑定3
        @Bean
        public Queue queue03() {
            return new Queue("queue03", true);
        }
        @Bean
        public Binding bingding03() {
            return BindingBuilder.bind(queue03()).to(exchange01()).with("mq.*");
        }
        
        //其它队列设置
        @Bean
        public Queue queue_image() {
            return new Queue("queue_image", true);
        }
        @Bean
        public Queue queue_pdf() {
            return QueueBuilder.durable("queue_pdf").build();
        }
    }
    ExchangeQueueBindingConfig.java

    二、消息模板 - RabbitTemplate

    我们在与SpringAMQP整合的时候进行发送消息的关键类,该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口

    confirmCallBack、返回值确认接口ReturnCalback等等。同样我们需要进行注入到Spring容器中,然后直接使用

    5.1注入消息模板

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    5.2 使用RabbitTemplate发送消息

    package com.everjiankang.dependency;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.core.MessagePropertiesBuilder;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class RabbitmqDemoApplicationTests {
        
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        @Test
        public void testSendMessage() {
            MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().build();
            messageProperties.getHeaders().put("desc", "信息描述");
            messageProperties.getHeaders().put("type", "自定义消息类型");
            String bodyStr = "这是消息体";
            Message message = new Message(bodyStr.getBytes(), messageProperties);
            rabbitTemplate.convertAndSend("topicExchange01", "spring.amqp", message,new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    System.out.println("----添加额外的设置----");
                    message.getMessageProperties().getHeaders().put("desc","额外修改的信息描述");
                    message.getMessageProperties().getHeaders().put("attr","额外新增的信息");
                    return message;
                }
            });
        }
    
    }
    @Test
    public void testSendMessage2() {
        MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().build();
        messageProperties.setContentType("text/plain");
        messageProperties.getHeaders().put("type", "自定义消息类型");
        Message message = new Message("这是消息体2".getBytes(), messageProperties);
        rabbitTemplate.send("topicExchange01", "spring.abc", message);
        
        rabbitTemplate.convertAndSend("topicExchange01", "spring.amqp", "hello world");
        rabbitTemplate.convertAndSend("topicExchange02", "rabbit.test", "hello world2");
    }

    三、简单消息监听容器 - SimpleMessageListenerContainer

    这个类非常强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足

    监听队列(多个队列)、自启动、自动声明功能

    1. 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等

    2. 设置消费者数量、最大最小数量、批量消费

    3. 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数

    4. 设置消费者标签生成策略、是否独占模式、消费者属性等

    5. 设置具体的监听器、消息转换器等

    6. 可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等

    7. 很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一特性取实现的。所以可以看出SpringAMQP非常的强大。

    (请思考,为什么可以动态的感知配置变更?)

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames("queue01","queue02","queue03");
        //container.setQueues(queue01(),queue02(),queue03());
        container.setMaxConcurrentConsumers(5);
        container.setConcurrentConsumers(1);
        //rejected:v.    拒绝接受; 不予考虑; 拒收; 不录用; 拒绝接纳; (因质量差) 不用,不出售,不出版;
        //默认消息被拒绝了后是否重回队列
        container.setDefaultRequeueRejected(false); 
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_RCM大帅哥超帅";
            }
        });
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());

            System.out.println("----ChannelAwareMessageListener_在监听:" + msg + "-----");

            }
        });
        return container;      
    }

    四、消息监听适配器 - MessageListenerAdapter

    1. 适配器 取代之前的ChannelAwareMessageListener

    @Bean
    public SimpleMessageListenerContainer simleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames("queue01","queue02","queue03");
        //container.setQueues(queue01(),queue02(),queue03());
        container.setMaxConcurrentConsumers(5);
        container.setConcurrentConsumers(1);
        //rejected:v.    拒绝接受; 不予考虑; 拒收; 不录用; 拒绝接纳; (因质量差) 不用,不出售,不出版;
        //默认消息被拒绝了后是否重回队列
        container.setDefaultRequeueRejected(false); 
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_RCM大帅哥超帅";
            }
        });
    //        container.setMessageListener(new ChannelAwareMessageListener() {
    //            @Override
    //            public void onMessage(Message message, Channel channel) throws Exception {
    //                String msg = new String(message.getBody());
    //                System.out.println("----ChannelAwareMessageListener_在监听:" + msg + "-----");
    //            }
    //        });
        //    delegate 美[ˈdelɪɡət , ˈdelɪɡeɪt]
        //n. 代表; 会议代表;
        //v. 授(权); 把(工作、权力等)委托(给下级); 选派(某人做某事);
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage"); //自定义method,自定义的话默认走handleMessage方法
        adapter.setMessageConverter(new MyTextMessageConverter());//
        container.setMessageListener(adapter);  //adapter取代ChannelAwareMessageListener
        return container;
    }

    2.自定义委托类

    class MessageDelegate{
        public void handleMessage(byte[] messageBody) {    //默认方法,根据MessageListenerAdapter类的源码
            System.err.println("默认方法,消息内容:" + new String(messageBody));
        }
        public void consumeMessage(byte[] messageBody) { //自定义method的方法
            System.err.println("字节数组方法,消息内容:" + new String(messageBody));
        }
        public void consumeMessage(String messageBody) {    //设置adapter.setMessageConverter(new MyMessageConverter());所走的方法
            System.err.println("字符串方法,消息内容:"+ messageBody);
        }
    }

    五、MessageConverter

    我们在进行发送消息的时候,正常情况下消息体为二进制byte[]类型的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter

    • 自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口
    • 重写下面两个方法: toMessage:java对象转换为Message fromMessage:Message对象转换为java对象
    • Json转换器:Jackson2JsonMessageConverter:可以进行Java对象的转换功能
    • DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系
    • 自定义二进制转换器:比如图片类型、PDF、PPT、流媒体

    1. 自定义MessageConverter:byte[] -> String

    class MyTextMessageConverter implements MessageConverter{
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            //Convert a Java object to a Message.
            return new Message(object.toString().getBytes(),messageProperties);
        }
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            // Convert from a Message to a Java object.
            String contentType = message.getMessageProperties().getContentType();
            if(null != contentType && contentType.contains("text"))
                return new String(message.getBody());
            return message.getBody();
        }
    }

    2. Jackson2JsonMessageConverter:byte[] -> json

    (1)设置转换器

    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("jsonHandleMethod");
    adapter.setMessageConverter(new Jackson2JsonMessageConverter());
    container.setMessageListener(adapter);  //adapter取代ChannelAwareMessageListener

    (2)处理方法

    public class MessageDelegate{
        public void jsonHandleMethod(Map message) { //方法参数一定是Map类型,而且不能有泛型,否则报错
            System.err.println("Jackson2JsonMessageConverter : " + message);
        }
    } 

    (3) 单元测试类发送json字符串,且类型是application/json的Message

       @Test
        public void testSendMessage3() throws Exception{
            MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().build();
            messageProperties.setContentType("application/json");
            messageProperties.getHeaders().put("type", "自定义消息类型");
            JSONObject json = new JSONObject();
            json.put("id", 1);
            json.put("name", "xiaochao");
            String messageBodyStr = json.toString();
            Message message = new Message(messageBodyStr.getBytes(), messageProperties);
            rabbitTemplate.send("topicExchange01", "spring.abc", message);
            rabbitTemplate.convertAndSend("topicExchange01", "spring.amqp", message);
            rabbitTemplate.convertAndSend("topicExchange02", "rabbit.test", message);
        }

    3. DefaultJackson2JavaTypeMapper:byte[] -> Java对象

    (1)设置转换器

    //1.2 DefaultJackson2JavaTypeMapper 和 Jackson2JsonMessageConverter 支持java对象转换
    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("consumeMessage");
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
    //如果使用RabbitMQ默认的转换方式,并不会涉及到本章遇到的信任package问题,如果想自定义消息转换并且使用DefaultClassMapper作为映射,
    //肯定会出现信任package的问题,所以如果需要自定义转换的小伙伴,记住要设置trustedPackages。
    javaTypeMapper.addTrustedPackages("com.everjiankang.dependency.model");
    jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
    adapter.setMessageConverter(jackson2JsonMessageConverter);
    container.setMessageListener(adapter);

    (2)处理方法

    /**
     * 自定义委托类
     */
    class MessageDelegate{
        public void javaObjectHandleMethod(User user) {
            System.err.println("JavaTypeMapper for User: " + user);
        }
        public void javaObjectHandleMethod(Dog dog) {
            System.err.println("JavaTypeMapper for Dog : " + dog);
        }
    }

    (3)测试类

    @Test
    public void testSendMessage4() throws Exception{
        MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().build();
        messageProperties.setContentType("application/json");
        messageProperties.getHeaders().put("__TypeId__", "com.everjiankang.dependency.model.User");
        User user = new User(1,"xiaochao",22);
        String userStr = JSON.toJSONString(user);
    
        String messageBodyStr = userStr.toString();
        Message message = new Message(messageBodyStr.getBytes(), messageProperties);
        rabbitTemplate.send("topicExchange01", "spring.abc", message);
        rabbitTemplate.convertAndSend("topicExchange01", "spring.amqp", message);
        rabbitTemplate.convertAndSend("topicExchange02", "rabbit.test", message);
    }

     5. 支持java对象多映射转换

    //1.3 DefaultJackson2JavaTypeMapper 和 Jackson2JsonMessageConverter 支持java对象多映射转换
    MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
    adapter.setDefaultListenerMethod("javaObjectHandleMethod");
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
    Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();    
    idClassMapping.put("user", User.class);
    idClassMapping.put("dog", Dog.class);
    javaTypeMapper.addTrustedPackages("com.everjiankang.dependency.model");
    javaTypeMapper.setIdClassMapping(idClassMapping);
    jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
    adapter.setMessageConverter(jackson2JsonMessageConverter);
    container.setMessageListener(adapter);
    return container;

    测试代码:

    要在Headers中添加

    messageProperties2.getHeaders().put("__TypeId__", "user");

    @Test
        public void testSendMappingMessage() throws JsonProcessingException {
            Order order = new Order();
            order.setId("001");
            order.setName("订单消息");
            order.setContent("订单描述信息");
            
            ObjectMapper mapper = new ObjectMapper();
            String json1 = mapper.writeValueAsString(order);
            System.err.println("order 4 json:" + json1);
            
            MessageProperties messageProperties = new MessageProperties();
            //这里一定要修改contentType为application/json
            messageProperties.setContentType("application/json");
            messageProperties.getHeaders().put("__TypeId__", "order");
            
            Message message = new Message(json1.getBytes(), messageProperties);
            rabbitTemplate.send("topic001", "spring.order", message);
            
            Packaged packaged = new Packaged();
            packaged.setId("002");
            packaged.setName("包裹消息");
            packaged.setDescription("包裹描述信息");
            
            String json2 = mapper.writeValueAsString(packaged);
            System.err.println("packaged 4 json:" + json2);
            
            MessageProperties messageProperties2 = new MessageProperties();
            //这里一定要修改contentType为application/json
            messageProperties2.setContentType("application/json");
            messageProperties2.getHeaders().put("__TypeId__", "packaged");
            
            Message message2 = new Message(json2.getBytes(), messageProperties2);
            rabbitTemplate.send("topic001", "spring.packaged", message2);
        }

    4. 全局转换器:图片、PDF、PPT、流媒体等

    (1)更改适配器部分代码,加入全局转换器

        //1.4 全局转换器:convert
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
            
            //全局转换器
            ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
            
            TextMessageConverter textConvert = new TextMessageConverter();
            convert.addDelegate("text", textConvert);
            convert.addDelegate("html/text", textConvert);
            convert.addDelegate("xml/text", textConvert);
            convert.addDelegate("text/plain", textConvert);
            
            Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
            convert.addDelegate("json", jsonConverter);
            convert.addDelegate("application/json", jsonConverter);
            
            ImageMessageConverter imageConverter = new ImageMessageConverter();
            convert.addDelegate("image/png", imageConverter);
            convert.addDelegate("image", imageConverter);
            
            PDFMessageConverter pdfConverter = new PDFMessageConverter();
            convert.addDelegate("application/pdf", pdfConverter);
            
            adapter.setMessageConverter(convert);
            container.setMessageListener(adapter);

     (2)MessageDelegate类的consumeMessage()添加方法

        public void consumeMessage(File file) {
            System.err.println("文件对象,消息内容, " + file.getName());
        }

    (3)添加自定义转换器ImageMessageConverter

    package com.dwz.spring.converter;
    
    import java.io.ByteArrayInputStream;
    import java.io.File;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    import java.nio.file.StandardCopyOption;
    import java.util.UUID;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.util.FileCopyUtils;
    
    public class ImageMessageConverter implements MessageConverter {
    
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            throw new MessageConversionException(" convert error! ");
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            System.err.println("---------------Image MessageConverter-------------------");
            Object _extName = message.getMessageProperties().getHeaders().get("extName");
            String extName = _extName == null ? "png" : _extName.toString();
            
            byte[] body = message.getBody();
            String fileName = UUID.randomUUID().toString();
            String path = "D:\dwz_temp\" + fileName + "." + extName;
            File f = new File(path);
            System.out.println(path);
            
            try {
                Files.copy(new ByteArrayInputStream(body), Paths.get(path), StandardCopyOption.REPLACE_EXISTING);
    //            FileCopyUtils.copy(body, f);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return f;
        }
        
    }

    (4)PDFMessageConverter

    package com.dwz.spring.converter;
    
    import java.io.ByteArrayInputStream;
    import java.io.File;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.util.UUID;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import org.springframework.amqp.support.converter.MessageConverter;
    
    public class PDFMessageConverter implements MessageConverter {
    
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            throw new MessageConversionException(" convert error! ");
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            System.err.println("---------------PDF MessageConverter-------------------");
            
            byte[] body = message.getBody();
            String fileName = UUID.randomUUID().toString();
            String path = "D:/dwz_temp/" + fileName + ".pdf";
            File f = new File(path);
            try {
                Files.copy(new ByteArrayInputStream(body), f.toPath());
            } catch (IOException e) {
                e.printStackTrace();
            }
            return f;
        }
    }

    (5)测试代码:

      5.1 图片转换测试

       @Test
        public void testSendImgMessage1() throws IOException {
            byte[] body = Files.readAllBytes(Paths.get("C:/Users/Administrator/Pictures/Saved Pictures/img02/dwz.png"));
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("image/png");
            messageProperties.getHeaders().put("extName", "png");
            
            Message message = new Message(body, messageProperties);
            rabbitTemplate.send("", "image_queue", message);
        }

      5.2 pdf文件转换测试

        @Test
        public void testSendPDFMessage2() throws IOException {
            byte[] body = Files.readAllBytes(Paths.get("F:\dwz\my\2019全新Java学习路线图.pdf"));
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("application/pdf");
            
            Message message = new Message(body, messageProperties);
            rabbitTemplate.send("", "pdf_queue", message);
        }

    兄弟文章:自定义MessageConverter--消息转换器

  • 相关阅读:
    NPM 重新回炉
    构建工具
    工作的环境部署
    Proxy 代理
    Promise 的 用法及实现
    JS 的 继承
    动态规划——LCS
    矩阵连乘——动态规划
    线段树&树状数组
    SpringAOP
  • 原文地址:https://www.cnblogs.com/guchunchao/p/13154293.html
Copyright © 2011-2022 走看看