zoukankan      html  css  js  c++  java
  • RocketMQ 怎样解决为了 实时拉取消息 而不得不一直轮询的问题

    我用的是DefaultMQPushConsumer,启动一个consumer的时候,根据之前的博文,push其实还是一次次的pullrequest。这里就有个问题:如果需要实时性很高,broker新收到一条消息之后,马上就要传递给订阅的consumer,那么consumer这边就需要不停的轮询,一次pullrequest收不到消息,马上进行下一次请求,这样就非常的耗费资源。

    这其实和线程竞争锁很像,rocketMQ的解决办法也和锁竞争的道理很像,看具体实现:

    1、broker这边,请求过来,如果有新消息返回,在consumer这边,异步请求的回调函数pullCallback中,判断pullResult不为null,那么把消息存到processQueue中之后,马上发起下一个请求。

    2、如果broker没有获取到新消息,并不会马上返回pullRequest(consumer那边的发送pullRequest的请求本来就是异步的,不用担心等待的问题),而是会在suspendPullRequest方法中,把当前的请求信息(主要是offset,group,topic,requestId这几个值)放到PullRequestHoldService.pullRequestTable中。而在ReputMessageService的doReput方法会每隔一毫秒扫描commitLog,如果有新消息,会建立索引,并同时判断之前有没有pulRequest在等待这个消息,如果有--->messageArrivingListener.arriving--->pullRequestHoldService.notifyMessageArriving--->mpr = this.pullRequestTable.get(key)--->requestList = mpr.cloneListAndClear() 把刚才存进去的所有pullRequest取出来,返回请求,这样就避免了不停的轮询。

    这里面会出现的异常情况:

    先看消费者这边,如果长时间没有订阅的消息到达broker---这是绝大多数的情况,那么消费者这边的responseTable中存的responseFuture就一直得不到响应。实际上会有个定时任务扫描responseTable,代码逻辑:nettyRemotingClient.start--->NettyRemotingClient.this.scanResponseTable(),定期(默认间隔30秒)取出过期(30秒 + 1秒)的responseFuture,执行callback的operationComplete方法,而pullRequest的operationComplete会判断responseFuture的responseCommand属性为不为null,没有得到响应的话是为null的,那么会进入else中pullCallback.onException,点进去看,是把pullRequest取出再放入队列中一次(其实这里也是重复消费的一个因素)。

    还有的情况就是某次pullRequest的请求已经发出,但是broker并没有收到而是在网络中丢掉了,或者说broker的响应消息没有成功到达consumer,这两种情况和上面说的一样,会导致过段时间再扫描,再拉取,只不过就是broker有消息到达,不能及时响应consumer,而是只能响应接下来的扫描提交的第二次消息,这样会影响时效(可以把上面说的扫描的间隔由30秒降低为3秒),不过好在这个订阅的事件不会中断。

    还有个问题,broker这里suspendPullRequest暂时扣下来的pullRequest如果一直没有消息到来去唤醒,那么consumer那边到期了就会再发一次请求,这样broker这边的pullRequest就会越积越多。对于这个问题broker这边也有定时任务检测,过期了就模拟消息到来唤醒,这次如果不成功获取消息,不再suspend,而是返回noMessage。具体代码逻辑:PullRequestHoldService是一个ServiceThread的子类,brokerController那里会start,run方法里面是上次提到的重写的countDownLatch循环wait5秒或者1秒(具体看配置文件中longPollingEnable的值),其实也就是个定时的周期任务,checkHoldRequest--->notifyMessageArriving--->executeRequestWhenWakeup也就是发现过期了(suspendTimestamp + timeoutMillis:CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND的值,默认30秒),模拟消息到来唤醒的过程,注意,唤醒之后的PullMessageProcessor.this.processRequest方法中的参数brokerAllowSuspend传入的是false,所以即使再获取不到,也会直接给出nomesage的响应而不是suspend了

  • 相关阅读:
    uva-10160-枚举
    zk-systemd
    c++官方文档-枚举-联合体-结构体-typedef-using
    c++官方文档-动态内存
    c++官方文档-指针
    c++官方文档-命名空间
    c++官方文档-模版函数和重载
    c++官方文档-按值传递和按引用传递
    c++官方文档
    HDU 1068
  • 原文地址:https://www.cnblogs.com/chuliang/p/13141279.html
Copyright © 2011-2022 走看看