zoukankan      html  css  js  c++  java
  • flume监控

    flume提供了一个度量框架,可以通过http的方式进行展现,当启动agent的时候通过传递参数 -Dflume.monitoring.type=http参数给flume agent:

    1
    2
    3
    4
    $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 
    -Dflume.monitoring.type=http
    -Dflume.monitoring.port=5653
    -Dflume.root.logger=INFO,console

    这样flume会在5653端口上启动一个HTTP服务器,访问如下地址,将返回JSON格式的flume相关指标参数:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    demo:

    访问: http://flume-agent-host:5653/metrics
    结果: 其中src-1是子自定义的source名称
    {
    "SOURCE.src-1":{
    "OpenConnectionCount":"0", //目前与客户端或sink保持连接的总数量(目前只有avro source展现该度量)
    "Type":"SOURCE",
    "AppendBatchAcceptedCount":"1355", //成功提交到channel的批次的总数量
    "AppendBatchReceivedCount":"1355", //接收到事件批次的总数量
    "EventAcceptedCount":"28286", //成功写出到channel的事件总数量,且source返回success给创建事件的sink或RPC客户端系统
    "AppendReceivedCount":"0", //每批只有一个事件的事件总数量(与RPC调用中的一个append调用相等)
    "StopTime":"0", //source停止时自Epoch以来的毫秒值时间
    "StartTime":"1442566410435", //source启动时自Epoch以来的毫秒值时间
    "EventReceivedCount":"28286", //目前为止source已经接收到的事件总数量
    "AppendAcceptedCount":"0" //单独传入的事件到Channel且成功返回的事件总数量
    },
    "CHANNEL.ch-1":{
    "EventPutSuccessCount":"28286", //成功写入channel且提交的事件总数量
    "ChannelFillPercentage":"0.0", //channel满时的百分比
    "Type":"CHANNEL",
    "StopTime":"0", //channel停止时自Epoch以来的毫秒值时间
    "EventPutAttemptCount":"28286", //Source尝试写入Channe的事件总数量
    "ChannelSize":"0", //目前channel中事件的总数量
    "StartTime":"1442566410326", //channel启动时自Epoch以来的毫秒值时间
    "EventTakeSuccessCount":"28286", //sink成功读取的事件的总数量
    "ChannelCapacity":"1000000", //channel的容量
    "EventTakeAttemptCount":"313734329512" //sink尝试从channel拉取事件的总数量。这不意味着每次事件都被返回,因为sink拉取的时候channel可能没有任何数据
    },
    "SINK.sink-1":{
    "Type":"SINK",
    "ConnectionClosedCount":"0", //下一阶段或存储系统关闭的连接数量(如在HDFS中关闭一个文件)
    "EventDrainSuccessCount":"28286", //sink成功写出到存储的事件总数量
    "KafkaEventSendTimer":"482493",
    "BatchCompleteCount":"0", //与最大批量尺寸相等的批量的数量
    "ConnectionFailedCount":"0", //下一阶段或存储系统由于错误关闭的连接数量(如HDFS上一个新创建的文件因为超时而关闭)
    "EventDrainAttemptCount":"0", //sink尝试写出到存储的事件总数量
    "ConnectionCreatedCount":"0", //下一个阶段或存储系统创建的连接数量(如HDFS创建一个新文件)
    "BatchEmptyCount":"0", //空的批量的数量,如果数量很大表示souce写数据比sink清理数据慢速度慢很多
    "StopTime":"0",
    "RollbackCount":"9", //
    "StartTime":"1442566411897",
    "BatchUnderflowCount":"0" //比sink配置使用的最大批量尺寸更小的批量的数量,如果该值很高也表示sink比souce更快
    }
    }

    Flume也可发送度量信息给Ganglia,用来监控Flume。在任何时候只能启用一个Ganglia或HTTP监控。Flume默认一分钟一次周期性的向Ganglia报告度量:

    1
    2
    3
    4
    5
    6
    7
    demo:

    $ bin/flume-ng agent --conf conf --conf-file example.conf --name a1
    -Dflume.monitoring.type=ganglia # 默认情况下flume以Ganglia3.1格式报告指标
    -Dflume.monitoring.pollFrequency=45 # 报告间隔时间(秒)
    -Dflume.monitoring.isGanglia3=true # 启用ganglia3个格式报告
    -Dflume.root.logger=INFO,console

    日志相关

    1
    2
    3
    4
    $ bin/flume-ng agent --conf conf --conf-file example.conf  --name a1 -Dflume.root.logger=INFO,console

    # -Dflume.root.logger=INFO,console 该参数将会把flume的日志输出到console,为了将其输出到日志文件(默认
    $FLUME_HOME/logs),可以将console改为LOGFILE形式,具体的配置可以修改$FLUME_HOME/conf/log4j.properties

    自定义Flume组件

    Flume本身可插拔的架构设计,使得开发自定义插件变得很容易。Flume本身提供了非常丰富的source、channel、sink以及拦截器等插件可供选择,基本可以满足生产需要。具体可以参考Flume用户文档.

    plugins.d目录

    plugins.d是flume事先约定的存放自定义组件的目录。flume在启动的时候会自动将该目录下的文件添加到classpath下,当然你也可以在flume-ng 启动时通过指定--classpath,-C <cp>参数将自己的文件手动添加到classpath下。
    相关目录说明:

    1
    2
    3
    plugins.d/xxx/lib - 插件jar
    plugins.d/xxx/libext - 插件依赖jar
    plugins.d/xxx/native - 本地库文件如 .so文件

    拦截器

    拦截器(Interceptor)是简单插件式组件,设置在Source和Source写入数据的Channel之间。Source接收到的事件在写入对应的Channel之前,拦截器都可以转换或删除这些事件。每个拦截器实例只处理同一个Source接收的事件。拦截器可以基于任意标准删除或转换事件,但是拦截器必须返回尽可能多(尽可能少)的事件,如同原始传递过来的事件.因为拦截器必须在事件写入Channel之前完成操作,只有当拦截器已成功转换事件后,RPC Source(和任何其他可能产生超时的Source)才会响应发送事件的客户端或Sink。因此尽量不要在拦截器中做大量耗时的处理操作。如果不得已这么处理了,那么需要相应的调整超时时间属性。Flume自身提供了多种类型的拦截器,比如:时间戳拦截器主机拦截器正则过滤拦截器等等。更多内容可以参考Flume Interceptors

    拦截器一般用于分析事件以及在需要的时候丢弃事件。编写拦截器时,实现者只需要写以一个实现Interceptor接口的类,同时实现Interceptor$Builder接口的Builer类。所有的Builder类必须有一个公共无参的构造方法,Flume使用该方法来进行实例化。可以使用传递到Builder类的Context实例配置拦截器。所有需要的参数都要传递到Context实例。下面是时间戳拦截器的实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    public class TimestampInterceptor implements Interceptor {

    private final boolean preserveExisting;

    /**
    * 该构造方法只能被Builder调用
    */
    private TimestampInterceptor(boolean preserveExisting) {
    this.preserveExisting = preserveExisting;
    }

    @Override
    public void initialize() {
    // no-op
    }

    /**
    * Modifies events in-place.
    */
    @Override
    public Event intercept(Event event) {
    Map<String, String> headers = event.getHeaders();
    if (preserveExisting && headers.containsKey(TIMESTAMP)) {
    // we must preserve the existing timestamp
    } else {
    long now = System.currentTimeMillis();
    headers.put(TIMESTAMP, Long.toString(now));
    }
    return event;
    }

    /**
    * Delegates to {@link #intercept(Event)} in a loop.
    * @param events
    * @return
    */
    @Override
    public List<Event> intercept(List<Event> events) {
    for (Event event : events) {
    intercept(event);
    }
    return events;
    }

    @Override
    public void close() {
    // no-op
    }

    /**
    * Builder which builds new instances of the TimestampInterceptor.
    */
    public static class Builder implements Interceptor.Builder {

    private boolean preserveExisting = PRESERVE_DFLT;

    @Override
    public Interceptor build() {
    return new TimestampInterceptor(preserveExisting);
    }

    //通过Context传递配置参数
    @Override
    public void configure(Context context) {
    preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
    }

    }

    public static class Constants {
    public static String TIMESTAMP = "timestamp";
    public static String PRESERVE = "preserveExisting";
    public static boolean PRESERVE_DFLT = false;
    }

    }

    注:

    1. intercept()的两个方法必须是线程安全的,因为如果source运行在多线程情况下,这些方法可能会被多个线程调用。
    2. 自定义拦截器的配置方式,interceptors type配置的是XXXInterceptor$Builder:

      1
      2
      3
      #自定义拦截器  --producer agent名称  --src-1 source名称   —-i1 拦截器名称  
      producer.sources.src-1.interceptors = i1
      producer.sources.src-1.interceptors.i1.type = com.networkbench.browser.flume.interceptor.MyBrowserInterceptor$Builder
    3. 将自定义代码打包放置到前面的plugins.d/ext-interceptors(可以自己命名)/lib目录下,启动flume时会自动加载该jar到classpath

    解析器

    Source使用嵌入式的反序列化器读取监控目录下的文件(这里以Spooling Directory Source为例),默认的反序列化器是LineDeserializer。该反序列化器会按行读取文件中的内容,封装成一个Event消息。默认一次读取的最大长度是2048个字符,你可以通过如下配置参数设置改值:

    1
    2
    # --producer agent名称  --src-1 source名称 
    producer.sources.src-1.deserializer.maxLineLength = 20480

    因此在使用LineDeserializer时对源文件内容有个粗略的估计,否则,当某行的内容超出最大长度时。该行内容会被截取成两个部分,封装成两个Event发送到channel中。这样,在某些场景下该行消息相当于非法消息了。如,某个文件按行记录一个http请求的所有内容,而事先我们无法预知一行http请求的最大长度(当然理论上你可以将maxLineLength设置成一个较大的值,解决该问题)。但是这里要说的是另外一种解决方案,很简单,参考LineDeserializer实现一个不限制最大长度的解析器(flume之所以这么设计是出于什么角度考虑?)。反序列化器的定义和前面的拦截器基本相同:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    public class LineDeserializer implements EventDeserializer {

    private static final Logger logger = LoggerFactory.getLogger(LineDeserializer.class);

    private final ResettableInputStream in;
    private final Charset outputCharset;
    private final int maxLineLength;
    private volatile boolean isOpen;

    public static final String OUT_CHARSET_KEY = "outputCharset";
    public static final String CHARSET_DFLT = "UTF-8";

    public static final String MAXLINE_KEY = "maxLineLength";
    public static final int MAXLINE_DFLT = 2048;

    LineDeserializer(Context context, ResettableInputStream in) {
    this.in = in;
    this.outputCharset = Charset.forName(context.getString(OUT_CHARSET_KEY, CHARSET_DFLT));
    this.maxLineLength = context.getInteger(MAXLINE_KEY, MAXLINE_DFLT);
    this.isOpen = true;
    }

    @Override
    public Event readEvent() throws IOException {
    ensureOpen();
    String line = readLine();
    if (line == null) {
    return null;
    } else {
    return EventBuilder.withBody(line, outputCharset);
    }
    }

    ...

    // TODO: consider not returning a final character that is a high surrogate
    // when truncating
    private String readLine() throws IOException {
    StringBuilder sb = new StringBuilder();
    int c;
    int readChars = 0;
    while ((c = in.readChar()) != -1) {
    readChars++;

    // FIXME: support
    if (c == ' ') {
    break;
    }

    sb.append((char)c);

    // 限制最大长度
    if (readChars >= maxLineLength) {
    logger.warn("Line length exceeds max ({}), truncating line!", maxLineLength);
    break;
    }
    }

    if (readChars > 0) {
    return sb.toString();
    } else {
    return null;
    }
    }

    //这里和Interceptor$Builder很像
    public static class Builder implements EventDeserializer.Builder {

    @Override
    public EventDeserializer build(Context context, ResettableInputStream in) {
    return new LineDeserializer(context, in);
    }

    }

    }

    接下来的步骤和拦截器一致

    1
    2
    3
    #自定义解析器
    producer.sources.src-1.deserializers = d1
    producer.sources.src-1.deserializer = com.networkbench.browser.flume.interceptor.MyLineDeserializer$Builder

    source

    Flume提供了丰富的source类型,如Avro SourceExec SourceSpooling Directory Source .
    这里要说的是实际使用过程中遇到的一个问题。还是前面记录http请求内容的场景,为了及时分析http请求的数据,我们将记录http请求的原始文件按照分钟进行切割,然后移动到spooling directory监控目录(如/tmp-logs)下。但是由于一些原因,会出现监控目录下文件重名的情况.

    1
    2
    /tmp-logs/access_2015_10_01_16_30.log.COMPLETED   #flume处理完的文件会自动进行重命名.COMPLETED 
    /tmp-logs/access_2015_10_01_16_30.log #刚进来的文件

    这种情况下后进来的access_2015_10_01_16_30.log,在flume读取完成后会对其进行重命名,但是该文件名已经被占用了,flume就会抛出如下的异常信息,停止处理该监控目录下的其他文件。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    25 九月 2015 16:48:59,228 INFO  [pool-22-thread-1] (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile:348)  - Preparing to move file /opt/nginx/tmp_logs/access-2015-09-25-13-51.log to /opt/nginx/tmp_logs/access-2015-09-25-13-51.log.COMPLETED
    25 九月 2015 16:48:59,229 ERROR [pool-22-thread-1] (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:256) - FATAL: Spool Directory source src-1: { spoolDir: /opt/nginx/tmp_logs }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
    java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /opt/nginx/tmp_logs/access-2015-09-25-13-51.log.COMPLETED
    at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:378)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:330)
    at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:259)
    at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

    跟踪抛出异常的源码,SpoolDirectorySource会启动一个线程轮询监控目录下的目标文件,当读取完该文件(readEvents)之后会对该文件进行重名(rollCurrentFile),当重命名失败时会抛出IllegalStateException,被SpoolDirectoryRunnable catch重新抛出RuntimeException,导致当前线程退出,从源码看SpoolDirectoryRunnable是单线程执行的,因此线程结束后,监控目录下其他文件不再被处理:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    # SpoolDirectorySource 启动SpoolDirectoryRunnable

    executor = Executors.newSingleThreadScheduledExecutor();

    File directory = new File(spoolDirectory);
    ...

    Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
    //默认500毫秒
    executor.scheduleWithFixedDelay(runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);
    ...


    # SpoolDirectorySource$SpoolDirectoryRunnable.run():

    @Override
    public void run() {
    int backoffInterval = 250;
    try {
    while (!Thread.interrupted()) {
    //这里读取文件内容,当该文件没有可读内容时,会调用ReliableSpoolingFileEventReader.retireCurrentFile()->ReliableSpoolingFileEventReader.rollCurrentFile()
    List<Event> events = reader.readEvents(batchSize);
    if (events.isEmpty()) {
    break;
    }
    sourceCounter.addToEventReceivedCount(events.size());
    sourceCounter.incrementAppendBatchReceivedCount();

    try {
    getChannelProcessor().processEventBatch(events);
    reader.commit();
    } catch (ChannelException ex) {
    logger.warn("The channel is full, and cannot write data now. The " +
    "source will try again after " + String.valueOf(backoffInterval) +
    " milliseconds");
    hitChannelException = true;
    if (backoff) {
    TimeUnit.MILLISECONDS.sleep(backoffInterval);
    backoffInterval = backoffInterval << 1;
    backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
    backoffInterval;
    }
    continue;
    }
    backoffInterval = 250;
    sourceCounter.addToEventAcceptedCount(events.size());
    sourceCounter.incrementAppendBatchAcceptedCount();
    }
    } catch (Throwable t) {
    //异常输出部分
    logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
    "Uncaught exception in SpoolDirectorySource thread. " +
    "Restart or reconfigure Flume to continue processing.", t);
    hasFatalError = true;
    //这里将该异常重新封装成了RuntimeException,导致当前线程退出
    Throwables.propagate(t);
    }
    }

    # ReliableSpoolingFileEventReader.rollCurrentFile():

    private void rollCurrentFile(File fileToRoll) throws IOException {

    File dest = new File(fileToRoll.getPath() + completedSuffix);
    logger.info("Preparing to move file {} to {}", fileToRoll, dest);

    // Before renaming, check whether destination file name exists
    if (dest.exists() && PlatformDetect.isWindows()) {
    /*
    * If we are here, it means the completed file already exists. In almost
    * every case this means the user is violating an assumption of Flume
    * (that log files are placed in the spooling directory with unique
    * names). However, there is a corner case on Windows systems where the
    * file was already rolled but the rename was not atomic. If that seems
    * likely, we let it pass with only a warning.
    */
    if (Files.equal(currentFile.get().getFile(), dest)) {
    logger.warn("Completed file " + dest +
    " already exists, but files match, so continuing.");
    boolean deleted = fileToRoll.delete();
    if (!deleted) {
    logger.error("Unable to delete file " + fileToRoll.getAbsolutePath() +
    ". It will likely be ingested another time.");
    }
    } else {
    String message = "File name has been re-used with different" +
    " files. Spooling assumptions violated for " + dest;
    throw new IllegalStateException(message);
    }

    // Dest file exists and not on windows
    } else if (dest.exists()) {
    //这里抛出目标文件已经存在的异常
    String message = "File name has been re-used with different" +
    " files. Spooling assumptions violated for " + dest;
    throw new IllegalStateException(message);

    // Destination file does not already exist. We are good to go!
    } else {
    boolean renamed = fileToRoll.renameTo(dest);
    if (renamed) {
    logger.debug("Successfully rolled file {} to {}", fileToRoll, dest);

    // now we no longer need the meta file
    deleteMetaFile();
    } else {
    /* If we are here then the file cannot be renamed for a reason other
    * than that the destination file exists (actually, that remains
    * possible w/ small probability due to TOC-TOU conditions).*/
    String message = "Unable to move " + fileToRoll + " to " + dest +
    ". This will likely cause duplicate events. Please verify that " +
    "flume has sufficient permissions to perform these operations.";
    throw new FlumeException(message); } } }

    现在基本清楚了异常栈的调用逻辑,那么和前面自定义解析器一样,我们可以重写ReliableSpoolingFileEventReader以及SpoolDirectorySource的相关实现,也就是自定义一个spooling source,在rollCurrentFile()重命名失败时,做些处理措施,比如将该文件重新命名为access_2015_10_01_16_30.log(2).COMPLETED(此时文件内容已经读取完毕了)继续处理(注意要是.COMPLETED结尾,不然flume会再次读取该文件)。

    改写完成之后,就和前面自定义解析器的处理步骤一样了,打包放在plugins.d目录下,配置:

    1
    producer.sources.src-1.type = com.networkbench.flume.source.SpoolDirectoryExtSource

    总结

    基本上flume的各种组件都可以自定义开发,本人使用flume时间也没多久,截止到目前为止遇到问题还有以下几个:

    消息重发

    这个坑其实是自己挖的,当时想当然的理解flume的配置参数#producer.sinks.sink-1.requiredAcks = 1(默认是1),我设置成了10,当时使用的kafka sink,由于某个kafka节点出现了问题(还没有仔细验证,是否kafka正常时也会出现该问题?),导致flume一直重发某个时间点的数据,而最新的数据一直被阻塞(可能是被缓存在了channel中)。导致后台接收的一直是某个时间点的消息。后台想到自己改动的这个参数,改回1之后就正常了。下面是官方文档对该参数的说明:

    requiredAcks 1 (默认值) How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.

    channel溢出

    chanel溢出是因为前面的消息重发导致的,当时使用的channle是File Channel,其中有几个配置项值得注意:

    配置项默认值说明
    transactionCapacity 10000 单个事务中可以写入或读取的事务的最大数量
    maxFileSize 2146435071 每个数据文件的最大大小(字节),一旦文件达到这个大小(或一旦写入下个文件达到这个大小),该文件保存关闭并在那个目录下创建一个新的数据文件。如果此值设置为高于默认值,仍以默认值为准
    minimumRequiredSpace 524288000 channel继续操作时每个卷所需的最少空间(字节),如果任何一个挂载数据目录的卷只有这么多空间剩余,channel将停止操作来防止损坏和避免不完整的数据被写入
    capacity 1000000 channel可以保存的提交事件的最大数量
    keep-alive 3 每次写入或读取应该等待完成的最大的时间周期(秒)

    前面的channel溢出推测就是由capacity的达到了限制造成的。

  • 相关阅读:
    SQL Server中查询结果拼接遇到的小问题
    Java中的类加载器----ClassLoader
    Struts2中的namespace使用
    Windows 8.1激活问题
    Zuul的使用,路由访问映射规则
    hystrixDashboard(服务监控)
    Hystrix(服务熔断,服务降级)
    POI 操作 excel表格 (简单整理)
    Feign的介绍和使用
    自定义Ribbon的负载均衡策略
  • 原文地址:https://www.cnblogs.com/breg/p/5649363.html
Copyright © 2011-2022 走看看