zoukankan      html  css  js  c++  java
  • 自定义MessageConverter--消息转换器

    我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter

    • 自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口
    • 重写下面两个方法: toMessage:java对象转换为Message fromMessage:Message对象转换为java对象
    • Json转换器:Jackson2JsonMessageConverter:可以进行Java对象的转换功能
    • DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系
    • 自定义二进制转换器:比如图片类型、PDF、PPT、流媒体

    在pom.xml文件中加入依赖:

        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.10.0</version>
        </dependency>
        
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.10.0</version>
        </dependency>

    1.完成将byte数组转换成String字符串

    上节的适配器不变,在适配器中添加一个转换器TextMessageConverter

            //1.适配器方式:默认是有自己的方法的名字的:handleMessage
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            //自己指定一个默认的方法名
            adapter.setDefaultListenerMethod("consumeMessage");
            //也可以加一个转换器:从字节数组转换为String
            adapter.setMessageConverter(new TextMessageConverter());
            container.setMessageListener(adapter);

    TextMessageConverter代码:

    package com.dwz.spring.converter;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import org.springframework.amqp.support.converter.MessageConverter;
    
    public class TextMessageConverter implements MessageConverter {
        //将其它对象转换成Message
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            return new Message(object.toString().getBytes(), messageProperties);
        }
        
        //将Message对象转换成String
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            String contentType = message.getMessageProperties().getContentType();
            if(null != contentType && contentType.contains("text")) {
                System.err.println("contentType:--String--" + contentType);
                return new String(message.getBody());
            }
            return message.getBody();
        }
    }

    此时我们适配器自定义的委托对象MessageDelegate的consumeMessage()接收的参数类型要与 fromMessage()返回的类型一致

    MessageDelegate类如下:

    public class MessageDelegate {
        public void consumeMessage(String messageBody) {
            System.err.println("consumeMessage默认方法,消息内容:String--" + messageBody);
        }
    }

    测试代码:

        @Test
        public void testMessage02() {
            //创建消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("text/plain");
            Message message = new Message("spring consumeMessage消息".getBytes(), messageProperties);
            rabbitTemplate.send("topic001", "spring.abc", message);
            
            rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
            
            rabbitTemplate.convertAndSend("topic002", "rabbit.amqp", "hello object message send!");
        }

    2. 支持json格式的转换器

    添加Order和Packaged这两个类

    Order类:

    package com.dwz.spring.entity;
    
    import java.io.Serializable;
    public class Order implements Serializable{
        private static final long serialVersionUID = 1L;
    
        private String id;
        
        private String name;
        
        private String content;
    
        public Order() {
            super();
            // TODO Auto-generated constructor stub
        }
    
        public Order(String id, String name, String content) {
            super();
            this.id = id;
            this.name = name;
            this.content = content;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getContent() {
            return content;
        }
    
        public void setContent(String content) {
            this.content = content;
        }
        
        
    }

    Packaged类:

    package com.dwz.spring.entity;
    
    import java.io.Serializable;
    public class Packaged implements Serializable{
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
    
        private String id;
        
        private String name;
        
        private String description;
    
        public Packaged() {
            super();
            // TODO Auto-generated constructor stub
        }
    
        public Packaged(String id, String name, String description) {
            super();
            this.id = id;
            this.name = name;
            this.description = description;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getDescription() {
            return description;
        }
    
        public void setDescription(String description) {
            this.description = description;
        }
        
    }

    设置适配器中setMessageConverter(jackson2JsonMessageConverter)转换器参数

            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
            Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
            adapter.setMessageConverter(jackson2JsonMessageConverter);
            container.setMessageListener(adapter);

    MessageDelegate类的consumeMessage()的入参修改为Map<String, Object>如下:

        public void consumeMessage(Map<String, Object> messageBody) {
            System.err.println("consumeMessage的map方法,消息内容:" + messageBody);
        }

    测试代码:

        /**
         *     支持json格式的转换器
         * @throws JsonProcessingException
         */
        @Test
        public void testSendJsonMessage() throws JsonProcessingException {
            Order order = new Order();
            order.setId("001");
            order.setName("消息订单");
            order.setContent("描述信息");
            
            ObjectMapper mapper = new ObjectMapper();
            String json = mapper.writeValueAsString(order);
            System.err.println("order 4 json:" + json);
            
            MessageProperties messageProperties = new MessageProperties();
            //这里一定要修改contentType为application/json
            messageProperties.setContentType("application/json");
            
            Message message = new Message(json.getBytes(), messageProperties);
            rabbitTemplate.send("topic001", "spring.order", message);
        }

     3.支持java对象转换

    更改适配器部分代码,完成Jackson2JsonMessageConverter转换器的DefaultJackson2JavaTypeMapper设置

            //1.2 DefaultJackson2JavaTypeMapper 和 Jackson2JsonMessageConverter 支持java对象转换
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
            Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
            DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
            //如果使用RabbitMQ默认的转换方式,并不会涉及到本章遇到的信任package问题,如果想自定义消息转换并且使用DefaultClassMapper作为映射,
            //肯定会出现信任package的问题,所以如果需要自定义转换的小伙伴,记住要设置trustedPackages。
            javaTypeMapper.addTrustedPackages("com.dwz.spring.entity");
            jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
            adapter.setMessageConverter(jackson2JsonMessageConverter);
            container.setMessageListener(adapter);

    MessageDelegate类的consumeMessage()的入参修改为Order对象如下:

        public void consumeMessage(Order order) {
            System.err.println("order对象,消息内容, id:" + order.getId()
                              +", name:" + order.getName()
                              +", content:" + order.getContent());
        }

    测试代码:

        /**
         * json与java对象之间的转换
         * @throws JsonProcessingException
         */
        @Test
        public void testSendJavaMessage() throws JsonProcessingException {
            Order order = new Order();
            order.setId("001");
            order.setName("消息订单");
            order.setContent("描述信息");
            
            ObjectMapper mapper = new ObjectMapper();
            String json = mapper.writeValueAsString(order);
            System.err.println("order 4 json:" + json);
            
            MessageProperties messageProperties = new MessageProperties();
            //这里一定要修改contentType为application/json
            messageProperties.setContentType("application/json");
            messageProperties.getHeaders().put("__TypeId__", "com.dwz.spring.entity.Order");
            
            Message message = new Message(json.getBytes(), messageProperties);
            rabbitTemplate.send("topic001", "spring.order", message);
        }

    4.支持java对象多映射转换

    更改适配器部分代码,加入多个typeid和对象的映射

            //1.3 DefaultJackson2JavaTypeMapper 和 Jackson2JsonMessageConverter 支持java对象多映射转换
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
            Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
            DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
            Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();    
            idClassMapping.put("order", Order.class);
            idClassMapping.put("packaged", Packaged.class);
            javaTypeMapper.setIdClassMapping(idClassMapping);
            jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
            adapter.setMessageConverter(jackson2JsonMessageConverter);
            container.setMessageListener(adapter);

    MessageDelegate类的consumeMessage()添加入参为Order和Packaged对象的两个重载方法如下:

        public void consumeMessage(Order order) {
            System.err.println("order对象,消息内容, id:" + order.getId()
                              +", name:" + order.getName()
                              +", content:" + order.getContent());
        }
        
        public void consumeMessage(Packaged pack) {
            System.err.println("Packaged对象,消息内容, id:" + pack.getId()
                              +", name:" + pack.getName()
                              +", description:" + pack.getDescription());
        }

    测试代码:

    要在Headers中添加

    messageProperties2.getHeaders().put("__TypeId__", "packaged");
        @Test
        public void testSendMappingMessage() throws JsonProcessingException {
            Order order = new Order();
            order.setId("001");
            order.setName("订单消息");
            order.setContent("订单描述信息");
            
            ObjectMapper mapper = new ObjectMapper();
            String json1 = mapper.writeValueAsString(order);
            System.err.println("order 4 json:" + json1);
            
            MessageProperties messageProperties = new MessageProperties();
            //这里一定要修改contentType为application/json
            messageProperties.setContentType("application/json");
            messageProperties.getHeaders().put("__TypeId__", "order");
            
            Message message = new Message(json1.getBytes(), messageProperties);
            rabbitTemplate.send("topic001", "spring.order", message);
            
            Packaged packaged = new Packaged();
            packaged.setId("002");
            packaged.setName("包裹消息");
            packaged.setDescription("包裹描述信息");
            
            String json2 = mapper.writeValueAsString(packaged);
            System.err.println("packaged 4 json:" + json2);
            
            MessageProperties messageProperties2 = new MessageProperties();
            //这里一定要修改contentType为application/json
            messageProperties2.setContentType("application/json");
            messageProperties2.getHeaders().put("__TypeId__", "packaged");
            
            Message message2 = new Message(json2.getBytes(), messageProperties2);
            rabbitTemplate.send("topic001", "spring.packaged", message2);
        }

    5.全局转换器:convert

    更改适配器部分代码,加入全局转换器

            //1.4 全局转换器:convert
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setDefaultListenerMethod("consumeMessage");
            
            //全局转换器
            ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
            
            TextMessageConverter textConvert = new TextMessageConverter();
            convert.addDelegate("text", textConvert);
            convert.addDelegate("html/text", textConvert);
            convert.addDelegate("xml/text", textConvert);
            convert.addDelegate("text/plain", textConvert);
            
            Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
            convert.addDelegate("json", jsonConverter);
            convert.addDelegate("application/json", jsonConverter);
            
            ImageMessageConverter imageConverter = new ImageMessageConverter();
            convert.addDelegate("image/png", imageConverter);
            convert.addDelegate("image", imageConverter);
            
            PDFMessageConverter pdfConverter = new PDFMessageConverter();
            convert.addDelegate("application/pdf", pdfConverter);
            
            adapter.setMessageConverter(convert);
            container.setMessageListener(adapter);

    MessageDelegate类的consumeMessage()添加方法

        public void consumeMessage(File file) {
            System.err.println("文件对象,消息内容, " + file.getName());
        }

    添加自定义转换器ImageMessageConverter和PDFMessageConverter

    package com.dwz.spring.converter;
    
    import java.io.ByteArrayInputStream;
    import java.io.File;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    import java.nio.file.StandardCopyOption;
    import java.util.UUID;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.util.FileCopyUtils;
    
    public class ImageMessageConverter implements MessageConverter {
    
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            throw new MessageConversionException(" convert error! ");
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            System.err.println("---------------Image MessageConverter-------------------");
            Object _extName = message.getMessageProperties().getHeaders().get("extName");
            String extName = _extName == null ? "png" : _extName.toString();
            
            byte[] body = message.getBody();
            String fileName = UUID.randomUUID().toString();
            String path = "D:\dwz_temp\" + fileName + "." + extName;
            File f = new File(path);
            System.out.println(path);
            
            try {
                Files.copy(new ByteArrayInputStream(body), Paths.get(path), StandardCopyOption.REPLACE_EXISTING);
    //            FileCopyUtils.copy(body, f);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return f;
        }
        
    }
    package com.dwz.spring.converter;
    
    import java.io.ByteArrayInputStream;
    import java.io.File;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.util.UUID;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import org.springframework.amqp.support.converter.MessageConverter;
    
    public class PDFMessageConverter implements MessageConverter {
    
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            throw new MessageConversionException(" convert error! ");
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            System.err.println("---------------PDF MessageConverter-------------------");
            
            byte[] body = message.getBody();
            String fileName = UUID.randomUUID().toString();
            String path = "D:/dwz_temp/" + fileName + ".pdf";
            File f = new File(path);
            try {
                Files.copy(new ByteArrayInputStream(body), f.toPath());
            } catch (IOException e) {
                e.printStackTrace();
            }
            return f;
        }
    }

    测试代码:

    图片转换测试

        @Test
        public void testSendImgMessage1() throws IOException {
            byte[] body = Files.readAllBytes(Paths.get("C:/Users/Administrator/Pictures/Saved Pictures/img02/dwz.png"));
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("image/png");
            messageProperties.getHeaders().put("extName", "png");
            
            Message message = new Message(body, messageProperties);
            rabbitTemplate.send("", "image_queue", message);
        }

    pdf文件转换测试

        @Test
        public void testSendPDFMessage2() throws IOException {
            byte[] body = Files.readAllBytes(Paths.get("F:\dwz\my\2019全新Java学习路线图.pdf"));
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("application/pdf");
            
            Message message = new Message(body, messageProperties);
            rabbitTemplate.send("", "pdf_queue", message);
        }
  • 相关阅读:
    从 Objective-C 里的 Alloc 和 AllocWithZone 谈起
    设计模式(26)
    iOS目录结构
    IOS didReceiveMemoryWarning 的那些事
    iOS多线程编程:线程同步总结
    我要进大厂之大数据Hadoop HDFS知识点(2)
    我要进大厂之大数据Hadoop HDFS知识点(1)
    我要进大厂之大数据ZooKeeper知识点(2)
    我要进大厂之大数据ZooKeeper知识点(1)
    rabbitmq使用延迟时报异常
  • 原文地址:https://www.cnblogs.com/zheaven/p/11912327.html
Copyright © 2011-2022 走看看