1.问题背景
一个事件中心接收网关通过ActiveMq上报的告警事件,处理后持久化到数据库,消息模型为发布订阅模式。为了实现高可用,决定将事件中心进行集群部署,运行两个实例。
但是由于消息模型为发布/订阅(publish/subscribe,topic),每个eps实例都会收到告警消息。如不加以控制,势必会造成告警消息重复消费的问题。
即我们需要不同的应用系统关心相同的消息,同时单个应用系统内部又可以部署多个实例达到负载均衡和故障转移,提高系统的健壮性。
2.解决思路
我们首先想到可以通过消费端增加检测重复消息的逻辑,来解决重复消费的问题,但是这种方式增加额外判断逻辑,且浪费消费端性能,并不可取。
实际上我们可以通过ActiveMq的一些高级特性来很好的解决此问题。ActiveMq提供了虚拟主题(Virtual Topics)的功能,如下图:
即通过一些配置后,单个应用系统内多个实例监听同一个queue,实例之间即可对消息进行平均消费,共同承担消费的功能。如果其中一个eps实例宕机,其他实例仍可以正常消费消息,消息不会丢失遗漏。
Virtual Topic这个功能特性可以关闭,即useVirtualTopics属性,默认为true,设置为false即可关闭此功能。当此功能开启,并且使用了持久化的存储时,ActiveMq的broker启动的时候会从持久化存储里拿到所有的Topic的名称,如果名称模式与VirtualTopics匹配,则把它们添加到系统的Virtual Topics列表中去。当有consumer访问此VirtualTopics时,系统会自动创建持久化的queue,并在每次Topic收到消息时,分发到具体的queue。
3.使用方法
虚拟主题有两种使用方式,下面分别进行介绍。
3.1 topic-queue名称匹配方式
此种方式需要发布者发布的Topic以“VirtualTopic.”这样的前缀来命名(大小写敏感)。比如我们定义一个VirtualTopic.event,然后发布者将消息发布到VirtualTopic.event。订阅者需要订阅名称为”Consumer.*. VirtualTopic.event”这样的队列。
下面使用springboot搭建工程进行演示,由于配置简单只涉及到topic和queue的名称,下面只显示关键步骤。
首先发布者创建名称VirtualTopic.event的topic,然后每隔3秒向此topic发布一条消息,
@Service
public class JmsSendMessageDemo {
@Autowired
private JmsTemplate jmsTemplate;
private Integer i = 0;
@Scheduled(fixedRate = 3000)
public void sendMessage(){
i++;
Event event = new Event("触碰", i, new Date());
jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.event"),JsonUtil.object2Json(event));
//System.out.println("发送消息: "+event);
}
}
然后建立两个应用作为消费者分别监听名为“Consumer.A.VirtualTopic.event“的queue,
第一个消费者:
@Service
public class JmsMessageListener {
@JmsListener(destination = "Consumer.A.VirtualTopic.event",containerFactory = "QueueContainerFactory")
public void receiveMessage(String message){
System.out.println("1 接收到消息: "+JsonUtil.json2Object(message,Event.class));
}
}
第二个消费者:
@Service
public class JmsMessageListener {
@JmsListener(destination = "Consumer.A.VirtualTopic.event",containerFactory = "QueueContainerFactory")
public void receiveMessage(String message){
System.out.println(" 2 接收到消息: "+JsonUtil.json2Object(message,Event.class));
}
}
最后启动两个消费者和发布者,通过两个消费者的输出窗口可以看到他们共同承担了消息的消费:
1 接收到消息: Event{name='触碰', id=2, date=Tue Aug 28 21:18:50 CST 2018}
1 接收到消息: Event{name='触碰', id=4, date=Tue Aug 28 21:18:56 CST 2018}
1 接收到消息: Event{name='触碰', id=6, date=Tue Aug 28 21:19:02 CST 2018}
1 接收到消息: Event{name='触碰', id=8, date=Tue Aug 28 21:19:08 CST 2018}
1 接收到消息: Event{name='触碰', id=10, date=Tue Aug 28 21:19:14 CST 2018}
1 接收到消息: Event{name='触碰', id=12, date=Tue Aug 28 21:19:20 CST 2018}
1 接收到消息: Event{name='触碰', id=14, date=Tue Aug 28 21:19:26 CST 2018}
1 接收到消息: Event{name='触碰', id=16, date=Tue Aug 28 21:19:32 CST 2018}
1 接收到消息: Event{name='触碰', id=18, date=Tue Aug 28 21:19:38 CST 2018}
2 接收到消息: Event{name='触碰', id=1, date=Tue Aug 28 21:18:47 CST 2018}
2 接收到消息: Event{name='触碰', id=3, date=Tue Aug 28 21:18:53 CST 2018}
2 接收到消息: Event{name='触碰', id=5, date=Tue Aug 28 21:18:59 CST 2018}
2 接收到消息: Event{name='触碰', id=7, date=Tue Aug 28 21:19:05 CST 2018}
2 接收到消息: Event{name='触碰', id=9, date=Tue Aug 28 21:19:11 CST 2018}
2 接收到消息: Event{name='触碰', id=11, date=Tue Aug 28 21:19:17 CST 2018}
2 接收到消息: Event{name='触碰', id=13, date=Tue Aug 28 21:19:23 CST 2018}
2 接收到消息: Event{name='触碰', id=15, date=Tue Aug 28 21:19:29 CST 2018}
2 接收到消息: Event{name='触碰', id=17, date=Tue Aug 28 21:19:35 CST 2018}
2 接收到消息: Event{name='触碰', id=19, date=Tue Aug 28 21:19:41 CST 2018}
2 接收到消息: Event{name='触碰', id=21, date=Tue Aug 28 21:19:47 CST 2018}
2 接收到消息: Event{name='触碰', id=23, date=Tue Aug 28 21:19:53 CST 2018}
然后我们将消费者2关闭(宕机),可以看到消费者1消费了所有的消息
1 接收到消息: Event{name='触碰', id=134, date=Tue Aug 28 21:25:26 CST 2018}
1 接收到消息: Event{name='触碰', id=135, date=Tue Aug 28 21:25:29 CST 2018}
1 接收到消息: Event{name='触碰', id=136, date=Tue Aug 28 21:25:32 CST 2018}
1 接收到消息: Event{name='触碰', id=137, date=Tue Aug 28 21:25:35 CST 2018}
1 接收到消息: Event{name='触碰', id=138, date=Tue Aug 28 21:25:38 CST 2018}
通过以上试验,我们看到通过虚拟主题的方式,两个实例可以通过监听同一个queue来平均消费消息,如果其中一个宕机,另一个会承担起所有的消息消费。
上面的方式需要发布者和订阅者统一对命名规范,如果发布者和订阅者已经存在,就需要统一升级,比较麻烦。实际上我们还可以拦截器的方法。
3.2 拦截器方式
这种方式需要修改ActiveMq的配置文件/conf/activemq.xml,添加以下拦截配置:
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="event" prefix="demo.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
然后发布者只需要往event主题上发布消息。订阅者通过订阅类似demo.A. event的队列的方式来消费。其他没有集群部署的应用仍可以订阅event主题进行消费。
此种方式的优点在于我们不需要对发布者和不需要改造的订阅者做任何变动,需要增加或者改造的订阅者使用虚拟主题的方式进行订阅即可达到负载均衡和故障转移的目的,当无法约束发布者发布的topic规范时,这种方式很有用。当然缺点就是需要修改ActiveMQ的配置,也就是说需要重启ActiveMQ,这对于已经上线的平台来说可能造成消息丢失。
参考:虚拟主题开发