这里只贴消费者的部分代码
第一部分:手动ack配置
package com.mybatis.plus.config.mq; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * * 描述: rabbitMQ配置 * * @author 官昌洪 * @date 2021/12/17 11:24 * @version V1.0 */ @Configuration public class MessageListenerConfig { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } }
第二部分:消费消息
package com.mybatis.plus.config.mq; import com.alibaba.fastjson.JSONObject; import com.mybatis.plus.entity.Log; import com.mybatis.plus.utils.EurekaUtils; import com.mybatis.plus.utils.hash.ConsistentHash; import com.mybatis.plus.utils.hash.pojo.ConsistentHashNode; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Map; @Slf4j @Component public class Receiver { @Value("${server.port}") private String port; @Autowired RabbitTemplate rabbitTemplate; @RabbitListener(id = "testDirectQueueId1", autoStartup = "false", queues = "testDirectQueue") public void consumer(Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { Thread.sleep(500); if ("testDirectQueue".equals(message.getMessageProperties().getConsumerQueue())) { String msg = new String(message.getBody(), "UTF-8"); Log parseObject = JSONObject.parseObject(msg, Log.class); log.info("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue()); log.info("消息成功消费到 messageId:" + parseObject.getLogUuid() + " messageData:" + parseObject.getLogTitle() + " createTime:" + parseObject.getCreateTime()); log.info("================================"); // 收到来自主机的消息 进行一致性hash分配 发往不同的服务 // 获取服务节点 创建一致hash环 ConsistentHash consistentHash = InitConfig.consistentHash; List<Map<String, String>> allServiceAddr = EurekaUtils.getAllServiceInfo("127.0.0.1", port, "PLUS2"); if (!allServiceAddr.isEmpty()) { for (Map<String, String> stringMap : allServiceAddr) { String instanceId = stringMap.get("routeKey"); // 新增1个物理节点和150个对应的虚拟节点 // String instanceId = stringMap.get("queueKey"); // 如果hash环中没有该节点 才新增 ConsistentHashNode node = consistentHash.getAccurateNode(instanceId); if (null == node) { consistentHash.putNode(new ConsistentHashNode(consistentHash.getPoint(instanceId), instanceId), 150); } } } else { //没有服务提供者 将消息返回队列 channel.basicReject(deliveryTag, true); return; } channel.basicAck(deliveryTag, false); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息 // 提取消息中的某个代表来源主机的标识 然后在hash环上分配目标节点 String logUuid = parseObject.getLogUuid(); ConsistentHashNode node = consistentHash.getNode(logUuid); log.info("主机标识:{},分配节点:{}", logUuid, node.getTarget()); //向指定路由发送消息 // todo 问题 这里怎么保证队列预先创建初始化好 解决方案 先从配置文件获取队列名称 新增服务时 需要重启服务 rabbitTemplate.convertAndSend("centerDeliverExchange", node.getTarget().toString(), msg); // planTwo(parseObject); log.info(">>>>>>>>>>>>消费消息成功!"); } } catch (Exception e) { log.info(">>>>>>>>>>>>消费消息失败!失败消息ID:{}, 失败原因:{}", deliveryTag, e.getMessage()); channel.basicReject(deliveryTag, true); } } }
第三部分:控制消费者开启,关闭
@Autowired private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; @RequestMapping("/startCustomer") public R startCustomer(){ MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer("testDirectQueueId1"); consumer.start(); return R.ok(); } @RequestMapping("/stopCustomer") public R stopCustomer(){ MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer("testDirectQueueId1"); consumer.stop(); return R.ok(); }
主要还是指定 RabbitListener 注解的ID属性进行控制