zoukankan      html  css  js  c++  java
  • RockeMQ-存储机制-可用性策略

    broker的可用性策略-快速失败机制

    具体实现在BrokerFastFailure,会执行一个定时任务扫描写消息任务的队列,当发现ospagecache繁忙的时候,就取出一个请求任务快速返回,直到OSPageCache不在繁忙。

    然后会遍历任务队列,如果发现某一个请求任务已经超时,那么也会立即返回避免producer堵塞。

    具体实现逻辑:

        private void cleanExpiredRequest() {
            while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
                try {
                    if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                        final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
                        if (null == runnable) {
                            break;
                        }
    
                        final RequestTask rt = castRunnable(runnable);
                        rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
                    } else {
                        break;
                    }
                } catch (Throwable ignored) {
                }
            }
    
            while (true) {
                try {
                    if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                        // peek不删除元素
                        final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek();
                        if (null == runnable) {
                            break;
                        }
                        final RequestTask rt = castRunnable(runnable);
                        if (rt == null || rt.isStopRun()) {
                            break;
                        }
    
                        final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                        if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) {
                            if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) {
                                rt.setStopRun(true);
                                rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size()));
                            }
                        } else {
                            break;
                        }
                    } else {
                        break;
                    }
                } catch (Throwable ignored) {
                }
            }
        }

    在remote的NettyRemotingAbstract类中的processRequestCommand,会把远程请求处理包装成一个RequestTask 然后放到sendMessageExecutor中

     final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                    pair.getObject2().submit(requestTask);
    
    

    实际上会把任务放到队列中 this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());

    取出来的时候,进行强转。final RequestTask rt = castRunnable(runnable);

  • 相关阅读:
    [BZOJ1222/Luogu2224][HNOI2001]产品加工
    [BZOJ1079/Luogu2476][SCOI2008]着色方案
    [BZOJ3098]Hash Killer II
    [BZOJ1818][CQOI2010]内部白点
    [BZOJ1497/Luogu4174][NOI2006]最大获利
    [BZOJ2330/Luogu3275][SCOI2011]糖果
    [BZOJ1208/Luogu2286][HNOI2004]宠物收养场
    [BZOJ1054/Luogu4289][HAOI2008]移动玩具
    Com组件介绍
    webBrowse官方说明
  • 原文地址:https://www.cnblogs.com/gaojy/p/15096810.html
Copyright © 2011-2022 走看看