zoukankan      html  css  js  c++  java
  • SpringBoot Rabbitmq接收消息

    官网地址:https://docs.spring.io/spring-boot/docs/2.1.3.RELEASE/reference/htmlsingle/#boot-features-amqp

    引入依赖:

      <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>

    接收消息:

    @Component
    public class RabbitmqListener {
        @RabbitListener(queues = "queue-1",containerFactory="myFactory")
        public void consumerMsgListenerqueue1(Message content) {
            System.out.println(messageToJson(content));
        }
        @RabbitListener(queues = "queue-2")
        public void consumerMsgListenerqueue2(byte[] content) {
           // System.out.println(messageToJson(content));
        }
    
        public Object messageToJson(Message message){
            Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
            return  jackson2JsonMessageConverter.fromMessage(message);
        }
    }

    容器类:

    @Configuration
    public class RabbitmqConfig {
    
            @Bean
            public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
                SimpleRabbitListenerContainerFactory factory =
                        new SimpleRabbitListenerContainerFactory();
                configurer.configure(factory, connectionFactory());
                factory.setMessageConverter(new MyMessageConverter());
                return factory;
            }
    
            @Bean
            public ConnectionFactory connectionFactory() {
                ConnectionFactory connectionFactory = new CachingConnectionFactory();
                ((CachingConnectionFactory) connectionFactory).setHost("47.75.152.XX0");
                ((CachingConnectionFactory) connectionFactory).setPort(5672);
                ((CachingConnectionFactory) connectionFactory).setUsername("qijie");
                ((CachingConnectionFactory) connectionFactory).setPassword("qijie.+");
    //        ExecutorService service= Executors.newFixedThreadPool(20);//线程池
    //        ((CachingConnectionFactory) connectionFactory).setExecutor();
                return connectionFactory;
            }
    }
    MyMessageConverter: 消息转换回json格式
    public class MyMessageConverter implements MessageConverter {
    
        @Override
        public org.springframework.amqp.core.Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    
            return null;
        }
    
        @Override
        public org.springframework.amqp.core.Message toMessage(Object object, MessageProperties messageProperties, Type genericType) throws MessageConversionException {
    
            return null;
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
            return message;
        }
    }
     
  • 相关阅读:
    Transcation And Lock--SQL SERVER 事务隔离级别
    Transaction And Lock--常用的查询事务和锁的语句
    使用shell读取文本文件发送到kafka
    VIM打开shell脚本中文乱码解决
    shell中日期操作
    oozie常见错误问题
    error: No implicit Ordering defined for Any
    启动mysql时显示:/tmp/mysql.sock 不存在的解决方法
    mysql中创建用户和赋权限
    (转)maven3.3.9编译oozie4.3.0
  • 原文地址:https://www.cnblogs.com/shiguotao-com/p/10445248.html
Copyright © 2011-2022 走看看