zoukankan      html  css  js  c++  java
  • Spring AMQP:MessageListenerAdapter MessageConverter

    一.MessageListenerAdapter

    消息监听适配器

    配置:

        @Bean
        public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            //设置监听的队列
            container.setQueues(queue());
            //设置当前消费者数量
            container.setConcurrentConsumers(1);
            container.setMaxConcurrentConsumers(5);
            //重回队列
            container.setDefaultRequeueRejected(false);
            //签收机制
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            //消费端标签策略
            container.setConsumerTagStrategy(queue-> queue+"_"+ UUID.randomUUID().toString());
            //消息适配器
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            container.setMessageListener(adapter);
            return container;
        }
    
    MessageDelegate类:
    public class MessageDelegate {
    
        public void handleMessage(byte[] messageBody){
            System.out.println("默认方法,消息:"+new String(messageBody));
        }
    
        public void handleMessage(String messageBody){
            System.out.println("字符串方法,消息"+messageBody);
        }
    }
    

    注意,自定义的Delegate类,默认方法名是handlermessage

    当然,想自定义方法名,可以setDefaultListenerMethod(),这里不再演示。

    测试字符串: 

        @Test
        public void testSend1(){
            rabbitTemplate.convertAndSend("amqp.bean.topic", "amqp.send","hello spring");
        }

    测试字节数组:

        @Test
        public void testSend2(){
            rabbitTemplate.convertAndSend("amqp.bean.topic", "amqp.send","hello spring2".getBytes());
        }

    此外也可以自定义转换器

    public class MyMessageConvert implements MessageConverter {
    
        /**
         * java对象转换为Message对象
         */
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            return new Message(object.toString().getBytes(),messageProperties);
        }
    
        /**
         * message对象转换为java对象
         */
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            return new String(message.getBody());
        }
    }
    

     在适配器中配置转换器

            //消息适配器
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            adapter.setMessageConverter(new MyMessageConvert());
            container.setMessageListener(adapter);
    

    配置了消息转换器后,现在无论发送的消息是字符串还是字节数组,适配器都直接进入handleMessage(String messageBody)方法。

     适配器还可以将队列名称与方法名称进行一一匹配。

    需要传入一个map,key是队列名,value是方法名

    adapter.setQueueOrTagToMethodName(queueName,method);

     源码:

    /**
    	 * Set the mapping of queue name or consumer tag to method name. The first lookup
    	 * is by queue name, if that returns null, we lookup by consumer tag, if that
    	 * returns null, the {@link #setDefaultListenerMethod(String) defaultListenerMethod}
    	 * is used.
    	 * @param queueOrTagToMethodName the map.
    	 * @since 1.5
    	 */
    	public void setQueueOrTagToMethodName(Map<String, String> queueOrTagToMethodName) {
    		this.queueOrTagToMethodName.putAll(queueOrTagToMethodName);
    	}
    

    当队列与方法绑定后,队列里的消息会被绑定的方法所处理。 

    二.MessageConverter

    消息转换器,一般自定义转换器需要实现这个接口,重写toMessage和fromMessage方法。

    Json转换器:Jackson2JsonMessageConverter,可以进行java对象的转换功能

    先加上jackson依赖:

            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
                <version>2.9.9</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.9.9</version>
            </dependency>

    json测试:

    在delegate类中定义处理json的方法:

        public void handleMessage(Map messageBody){
            System.out.println("json转换,消息:"+messageBody);
        }
    

    在adapter中配置Jackson2JsonMessageConverter

            //消息适配器
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            //adapter.setMessageConverter(new MyMessageConvert());
            //json形式转换器
            Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
            adapter.setMessageConverter(converter);
            container.setMessageListener(adapter);
    

    测试发送消息:

        @Test
        public void testJson() throws JsonProcessingException {
            User user = new User("张三",22);
            ObjectMapper mapper = new ObjectMapper();
            String json = mapper.writeValueAsString(user);
            MessageProperties properties = new MessageProperties();
            properties.setContentType("application/json");
            Message message = new Message(json.getBytes(), properties);
            rabbitTemplate.send("amqp.bean.topic", "amqp.send",message);
        }
    

    测试结果:

     配置DefaultJackson2JavaTypeMapper映射器:进行java对象的映射关系

            //消息适配器
            MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
            //adapter.setMessageConverter(new MyMessageConvert());
            //json形式转换器
            Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
            DefaultJackson2JavaTypeMapper mapper = new DefaultJackson2JavaTypeMapper();
            //设置安全包
            mapper.setTrustedPackages("com.wj.springamqp.domain");
            converter.setJavaTypeMapper(mapper);
            adapter.setMessageConverter(converter);
            container.setMessageListener(adapter);
    

    注意要setTrustedPackages,否则会报如下错误。

     测试类:

        @Test
        public void testJson() throws JsonProcessingException {
            User user = new User("张三",22);
            ObjectMapper mapper = new ObjectMapper();
            String json = mapper.writeValueAsString(user);
            MessageProperties properties = new MessageProperties();
            properties.setContentType("application/json");
            //key:_TypeId_ value:类的全路径
            //加上去后,就可以支持json转换为java对象
            properties.getHeaders().put("__TypeId__","com.wj.springamqp.domain.User");
            Message message = new Message(json.getBytes(), properties);
            rabbitTemplate.send("amqp.bean.topic", "amqp.send",message);
        }
    

    注意!!!"__TypeId__"这里下划线是四个,前面两个后面两个,不然无法转为java对象。

    Jackson2JsonMessageConverter和DefaultJackson2JavaTypeMapper支持java对象多映射转换。

            //json形式转换器
            Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
            DefaultJackson2JavaTypeMapper mapper = new DefaultJackson2JavaTypeMapper();
            //设置安全包
            mapper.setTrustedPackages("com.wj.springamqp.domain");
    
            Map<String, Class<?>> map = new HashMap<>();
            map.put("user", com.wj.springamqp.domain.User.class);
            map.put("stu", com.wj.springamqp.domain.Stu.class);
            mapper.setIdClassMapping(map);
    
            converter.setJavaTypeMapper(mapper);
    

     此外MessageConverter支持pdf,image的转换,在发送消息的时候,需要将文件转换为二进制字节流,然后在转换器中生成文件。

  • 相关阅读:
    如何不传入对象就获得某对象的方法---ThreadLocal类
    Linux系统主目录被更改,怎么修改回去?
    tree命令的安装
    Linux命令学习man
    当重载函数的参数是Object和Object数组的时候会发生什么情况!!!
    Linux学习(二)之内核、系统调用、库
    使用puttygen转换OpenSSH SSH2私钥为RSA PEM格式
    docker-compose使用详解
    svn迁移到gitlab
    linux快速启动http端口
  • 原文地址:https://www.cnblogs.com/wwjj4811/p/13036210.html
Copyright © 2011-2022 走看看