zoukankan      html  css  js  c++  java
  • RocketMQ源码 pushConsumer的Clustering模式下,是如何保证集群消费的特性的?

    先说特性:一条消息只被集群中的一个消费者消费。

    之前的文章里提到在consumer启动之后,rebalanceService.start()在while循环中,每20秒doRebalance一次,doRebalance的逻辑就是去nameSrv上获取这个topic下面所有的broker+queue信息,再去其中的一个broker上获取当前consumerGroup下属的所有consumer信息(按照先后注册的顺序排列,之所以任意一个broker上面会有所有consumer的信息是因为每一个consumer都要和所有的broker注册并保持心跳),然后把queue平均分配给consumer,如果queue的数量大于consumer,会存在一个或以上的queue分配给一个consumer的情况,但是同一个queue不会分配给多个consumer。这样就保证了一个队列上的消息只会被一个消费者消费

    这里有个问题:如果集群中一个consumer关闭或者断开连接,或者新增了一个consumer,是怎样马上感知到,然后重新均匀分配的?

    一、先说新增一个consumer,在它start的时候,会有sendHeartbeatToAllBrokerWithLock,而broker收到hearbeat后,会有registerConsumer,然后调用consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE...)方法,notifyConsumerIdsChanged向所有的consumer发起NOTIFY_CONSUMER_IDS_CHANGED请求,而consumer在收到后,会调用notifyConsumerIdsChanged,里面有rebalanceImmediately,调用waitPoint.countDown(),放行上面说的要等待20秒的doRebalance;

    二、再说一个consumer断开了,close方法会进入inActive和unRegister方法,而异常/正常关闭会先进入exceptionCaught,再执行inActive和unRegiste方法,服务端用的handler是NettyConnectManageHandler,其点进去其inactive方法看会有NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));往一个继承了ServiceThread的NettyEventExecutor的队列中放NettyEvent,而它的run方法是从这个队列中取出Event,按照当前是CLOSE的类型,点进去ConsumerManager的doChannelCloseEvent方法,又进入了上面说的consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE...)方法,后面就和新增的逻辑一样了 。另外,因为异常关闭还会先进入inactive方法,放入的NettyEventType是EXCEPTION,而取出这个event而是执行ConsumerManager的doChannelCloseEvent,也就是说doChannelCloseEvent要执行两次。不过doChannelCloseEvent的逻辑是去consumerTable中根据groupName取出ConsumerGroupInfo,再在里面删除当前channel信息,channel只能被删一次,所以是幂等的。

    注意:上面说的只是怎样做到一个队列只被集群下的一个消费者监听,从而在大体上保证一条消息只被一个消费者消费,但是在复杂的网络通信的环境中,一个Consumer的下线和新增是有可能导致在broker中队列的重新分配过程中,一条消息被两个队列先后重复消费的。

  • 相关阅读:
    将台湾与山西的资源进行整合,搭建晋台两地商品营销平台
    范姜锋:致力于协助台湾青年创业,融入“一带一路”建设
    不甘于平凡,他靠借钱入行做电器生意,年销售额竟突破亿元?
    研究生接手父亲的事业,当起“猪妹”每天和猪打交道
    面对找不到工作的困难,小伙选择创业开酒吧,月收入竟达到了6万
    django 直接将数据分配给前台
    Centos7下安装与卸载Jdk1.8
    从“挖光缆”到“剪网线”|蚂蚁金服异地多活的微服务体系
    推进“互联网+政务服务” 加快新型智慧城市建设
    推进“互联网+政务服务” 加快新型智慧城市建设
  • 原文地址:https://www.cnblogs.com/chuliang/p/12433238.html
Copyright © 2011-2022 走看看