准备:
1.RabbitMQ安装(我是在window环境下安装的)。
安装完成之后进入登录页面配置,默认地址:http://localhost:15672
2.创建一个SpringBoot项目。
配置文件:
#rabbitmq rabbitmq: host: 127.0.0.1 port: 5672 username: root password: 123456 virtual-host: / publisher-confirms: true publisher-returns: true listener: simple: acknowledge-mode: none concurrency: 1 max-concurrency: 1 retry: enabled: false
java代码
RabbitMQ配置:
/** * 消息列队配置 * * @author My */ @Component public class RabbitmqConfig { private Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class); @Autowired private RabbitTemplate rabbitTemplate; // 消息交换机 public final static String DIRECT_EXCHANGE = "directExchange"; // 日志收集 public final static String ROUTING_KEY_LOGGER = "directLoggerQueue"; }
配置交换机
@Bean public DirectExchange directExchange() { // return new DirectExchange(DIRECT_EXCHANGE, true, false); logger.debug("RabbitMQ交换机初始化"); return (DirectExchange) ExchangeBuilder.directExchange(DIRECT_EXCHANGE).durable(true).build(); }
配置logger消息列队
@Bean public Queue directLoggerQueue() { return QueueBuilder.durable(ROUTING_KEY_LOGGER).build(); }
将消息列队和交换机绑定
@Bean public Binding bindingDirectLoggerQueue(Queue directLoggerQueue, DirectExchange directExchange) { return BindingBuilder.bind(directLoggerQueue).to(directExchange).with(ROUTING_KEY_LOGGER); }
生成者:
/** * 日志收集 * 生产者 * @author My */ @Component public class LoggerProducer { private Logger logger = LoggerFactory.getLogger(LoggerProducer.class); @Autowired private AmqpTemplate amqpTemplate; /** * 添加消息 * @param object */ public void send(Object object) { String jsonStr = JSONObject.toJSONString(object); amqpTemplate.convertAndSend(RabbitmqConfig.DIRECT_EXCHANGE, RabbitmqConfig.ROUTING_KEY_LOGGER, jsonStr); logger.info("日志消息已经发送++++++++++++++++++"); } }
我这里传输的时候是对数据进行了json格式化,也可以根据自己的业务进行修改。
RabbitmqConfig.DIRECT_EXCHANGE 交换机的名字
RabbitmqConfig.ROUTING_KEY_LOGGER Queue列队的名字
消费者:
/** * 日志收集处理 * 消费者 * @author My */ @Component public class LoggerConsumer { private Logger logger = LoggerFactory.getLogger(LoggerConsumer.class); @Autowired private ILogService iLogService; @RabbitHandler @RabbitListener(queues = RabbitmqConfig.ROUTING_KEY_LOGGER) public void process(String json, Message amqpMessage, Channel channel) throws Exception { logger.debug("logger接收到消息:{}", json); LogModel log = JSONObject.parseObject(json, LogModel.class); iLogService.insert(log); } }
这里@RabbitListener注解中的queues需要和生产者中的名字一致。
调用的地方注入之后使用就可以了。
/** 日志收集通道 */ @Autowired private LoggerProducer loggerProducer;
整合完成。