zoukankan      html  css  js  c++  java
  • SpringBoot与RabbitMQ进行整合

    准备:

    1.RabbitMQ安装(我是在window环境下安装的)。

    安装完成之后进入登录页面配置,默认地址:http://localhost:15672

    2.创建一个SpringBoot项目。

    配置文件:

      #rabbitmq
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: root
        password: 123456
        virtual-host: /
        publisher-confirms: true
        publisher-returns: true
        listener:
          simple:
            acknowledge-mode: none
            concurrency: 1
            max-concurrency: 1
            retry:
              enabled: false

    java代码

    RabbitMQ配置:

    /**
     * 消息列队配置
     * 
     * @author My
     */
    @Component
    public class RabbitmqConfig {
        private Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class);
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
        // 消息交换机
        public final static String DIRECT_EXCHANGE = "directExchange";
        // 日志收集
        public final static String ROUTING_KEY_LOGGER = "directLoggerQueue";
    }

    配置交换机

    @Bean
    public DirectExchange directExchange() {
        // return new DirectExchange(DIRECT_EXCHANGE, true, false);
        logger.debug("RabbitMQ交换机初始化");
        return (DirectExchange) ExchangeBuilder.directExchange(DIRECT_EXCHANGE).durable(true).build();
    }

    配置logger消息列队

    @Bean
    public Queue directLoggerQueue() {
       return QueueBuilder.durable(ROUTING_KEY_LOGGER).build();
    }

    将消息列队和交换机绑定

    @Bean
    public Binding bindingDirectLoggerQueue(Queue directLoggerQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directLoggerQueue).to(directExchange).with(ROUTING_KEY_LOGGER);
    }

    生成者:

    /**
     * 日志收集
     * 生产者
     * @author My
     */
    @Component
    public class LoggerProducer {
    
        private Logger logger = LoggerFactory.getLogger(LoggerProducer.class);
        
        @Autowired private AmqpTemplate amqpTemplate;
        
        /**
         * 添加消息
         * @param object
         */
        public void send(Object object) {
            String jsonStr = JSONObject.toJSONString(object);
            amqpTemplate.convertAndSend(RabbitmqConfig.DIRECT_EXCHANGE, RabbitmqConfig.ROUTING_KEY_LOGGER, jsonStr);
            logger.info("日志消息已经发送++++++++++++++++++");
        }
    }

    我这里传输的时候是对数据进行了json格式化,也可以根据自己的业务进行修改。

    RabbitmqConfig.DIRECT_EXCHANGE 交换机的名字
    RabbitmqConfig.ROUTING_KEY_LOGGER Queue列队的名字
     

    消费者:

    /**
     * 日志收集处理
     * 消费者
     * @author My
     */
    @Component
    public class LoggerConsumer {
        private Logger logger = LoggerFactory.getLogger(LoggerConsumer.class);
    
        @Autowired private ILogService iLogService;
        
        @RabbitHandler
        @RabbitListener(queues = RabbitmqConfig.ROUTING_KEY_LOGGER)
        public void process(String json, Message amqpMessage, Channel channel) throws Exception {
            logger.debug("logger接收到消息:{}", json);
            LogModel log = JSONObject.parseObject(json, LogModel.class);
            iLogService.insert(log);
    
        }
    }

    这里@RabbitListener注解中的queues需要和生产者中的名字一致。

    调用的地方注入之后使用就可以了。

    /** 日志收集通道 */
    @Autowired
    private LoggerProducer loggerProducer;

    整合完成。

  • 相关阅读:
    算法初步——哈希表B.1038统计同成绩学生
    算法初步——哈希表B10133.旧键盘打字 (注意bool型数组的赋值为true的方法)
    算法初步——哈希表B1029/A1084. 旧键盘
    算法初步——排序 A1012.The Best Rank(25)
    《思维导图》——东尼博赞
    算法初步——排序B1015/A1062.德才论
    入门模拟——(字符串处理)A1001. A+B Format(20)
    RMQ问题(线段树+ST算法)
    PKU 2406 Power Strings(KMP最长循环不重叠字串)
    KMP算法 kuangbin
  • 原文地址:https://www.cnblogs.com/se7end/p/9522555.html
Copyright © 2011-2022 走看看