zoukankan      html  css  js  c++  java
  • Kafka学习笔记之Kafka应用问题经验积累

    0x00 Kafka 配置文件同步

    为了给kafka的进程添加GC日志信息,方便在以后重启的时候,加入GC日志:
    修改bin/kafka-server-start.sh:

    export KAFKA_OPTS="-Xms4G -Xmx8G -Xmn3G -XX:+UseConcMarkSweepGC -XX:ParallelGCThreads=4 -server -Dlog4j.configuration=file:$base_dir/config/log4j.properties -Xloggc:/data0/kafka/log/gc.log -verbose:gc -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+PrintGCApplicationStoppedTime"
    
    1. 书写脚本文件:syncProperty.sh 如下
    . /etc/bashrc
    . /etc/profile
    echo qwe123 > password.pass
    chmod 600 password.pass
    sudo chown root:root password.pass
    sudo rsync root@10.39.3.75::shellResult/huangqiang/kafka-server-start.sh /usr/local/kafka-0.8.0-beta1-src/bin/kafka-server-start.sh --password-file=password.pass
    sudo rsync root@10.39.3.75::shellResult/huangqiang/kafka-server-start.sh /usr/local/kafka-0.8.0-release/bin/kafka-server-start.sh --password-file=password.pass
    
    1. 上传脚本文件到同步的机器:
    • export RSYNC_PASSWORD=qwe123 && rsync kafka-server-start.sh root@10.39.3.75::shellResult/huangqiang/ && rsync syncProperty.sh root@10.39.3.75::shellResult/huangqiang/
    1. 在客户端命令行执行:
    • export RSYNC_PASSWORD=qwe123 && rsync root@10.39.3.75::shellResult/huangqiang/syncProperty.sh ./ && sh syncProperty.sh

     

    0x01 Kafka Leader所在机器的元数据信息有问题 NotLeaderForPartitionException

    在某些机器上有如下的错误信息:

    [2016-10-09 15:00:00,504] WARN [ReplicaFetcherThread--1-17], error for partition [weibo_common_act2,14] to broker 17 (kafka.server.ReplicaFetcherThread)
    kafka.common.NotLeaderForPartitionException
            at sun.reflect.GeneratedConstructorAccessor4.newInstance(Unknown Source)
            at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
            at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
            at java.lang.Class.newInstance(Class.java:374)
            at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70)
            at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157)
            at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157)
            at kafka.utils.Logging$class.warn(Logging.scala:88)
            at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23)
            at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:156)
            at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112)
            at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
            at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112)
            at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    

    broker 17 机器上的sever.log有如下的警告:

    [2016-10-09 15:00:02,111] WARN [KafkaApi-17] Fetch request with correlation id 82105147 from client ReplicaFetcherThread--1-17 on partition [weibo_common_act2,14] failed due to Leader not local for partition [weibo_common_act2,14] on broker 17 (kafka.server.KafkaApis)
    

    分析:partition [weibo_common_act2,14]的2个broker同步副本是[8,17].broker 8 认为自己不是应有的leader了,所以会抛出这个错误。需要重新启动broker 8。(也就是重新启动该partition所在leader的broker机器)

    什么时候会出现这个问题
    某个partition本来是2个replica,但是 In Sync Replicas 里面有3个replica.此时,若对该partition执行kafka-preferred-replica-election.sh,就会发生上诉的异常。日志如下

    [2016-10-09 16:38:21,752] INFO [Replica Manager on Broker 17]: Handling LeaderAndIsr request Name:LeaderAndIsrRequest;Version:0;Controller:14;ControllerEpoch:33;CorrelationId:81;ClientId:id_14-host_10.39.4.215-port_19092;PartitionState:(weibo_common_act2,4) -> (LeaderAndIsrInfo:(Leader:8,ISR:17,15,8,LeaderEpoch:21,ControllerEpoch:33),ReplicationFactor:2),AllReplicas:8,17);Leaders:id:8,host:10.39.4.210,port:19092 (kafka.server.ReplicaManager)
    

     

    0x02 consumer消费的offset向前偏移

    相关博客 监控Kafka消费延迟:Burrow

    在偏移量发生重置之前出现了几个consumer的rebalance。Rebalance一般发生在Consumers离开或者加入Consumer group,或者新的topic或分区编程可以消费的情况。在reblance期间,consumer依次经过:

    • 停止消费数据;
    • 提交它们的偏移量
    • 跨group重新分配分区
    • 从新所属的分区获取偏移量
    • 重新消费数据

    在前面的打印日志中,initOffset所在行会指出consumer将从哪个位置开始消费。

     

    0x03 kafka.common.NotLeaderForPartitionException

     WARN [ReplicaFetcherThread-3-9], error for partition [ols_test,0] to broker 9 (kafka.server.ReplicaFetcherThread)
    kafka.common.NotLeaderForPartitionException
            at sun.reflect.GeneratedConstructorAccessor2.newInstance(Unknown Source)
            at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
            at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
    

    分析:ols_test 的 partition 0 leader所在的这台机器,无法获取到正确的partition的信息,从kafka manager里看这个partition的latest offset是0。因此,是不正常的。怀疑是该机器ols_test topic的元数据信息不正确。但是,查看该topic的元数据信息的partition个数是和zk一致的,可能存在别的原因。

    解决:利用kafka-preferred-replica-election.sh切换leader后,新的leader和该机器都没有类似的错误了,Latest Offset也更新正常。

     

    0x04 maven编译带有scala和java代码的项目

    mvn clean scala:compile compile package

     

    0x05 gmond服务不可用

    现象:中心机的ganglia服务不可用,通过telnet 10.39.4.204 8649长时间无法获取数据。重启后,发现另外的28台kafka机器无法正常发送数据到中心机。直到重新启动服务才可以。(这个原因待查)

    重启命令:service gmond restart

     

    0x05 Storm 官方消费Kafka-Spout 延迟较大

            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka</artifactId>
                <version>0.9.3</version>
                <scope>compile</scope>
            </dependency>
    
    [INFO 2016-08-26 10:19:04 s.k.ZkCoordinator:89 kafkaStormSpout:3-MultipleThreadSpoutExecutors] Task [1/1] Deleted partition managers: []
    [INFO 2016-08-26 10:19:04 s.k.ZkCoordinator:95 kafkaStormSpout:3-MultipleThreadSpoutExecutors] Task [1/1] New partition managers: []
    [INFO 2016-08-26 10:19:04 s.k.ZkCoordinator:106 kafkaStormSpout:3-MultipleThreadSpoutExecutors] Task [1/1] Finished refreshing
    [INFO 2016-08-26 10:19:22 c.s.i.k.DelayBolt:69 delayBolt:2-BoltExecutors] >30s|>1min|>2min|>3min|
    [INFO 2016-08-26 10:19:22 c.s.i.k.DelayBolt:70 delayBolt:2-BoltExecutors] ---|---|---|---|
    [INFO 2016-08-26 10:19:22 c.s.i.k.DelayBolt:71 delayBolt:2-BoltExecutors] 85676|60994|48271|725023|
    [INFO 2016-08-26 10:19:22 c.s.i.k.DelayBolt:72 delayBolt:2-BoltExecutors] =======================
    [INFO 2016-08-26 10:19:22 c.s.i.k.DelayBolt:73 delayBolt:2-BoltExecutors] average delay:532830 ms, messageCount:1000000.
    [ERROR 2016-08-26 10:19:41 o.a.c.ConnectionState:201 CuratorFramework-0] Connection timed out for connection string (10.39.1.66:22181,10.39.1.67:22181,10.39.1.68:22181) and timeout (15000) / elapsed (19049)
    org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
            at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:198) [curator-client-2.5.0.jar:na]
            at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:88) [curator-client-2.5.0.jar:na]
            at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:115) [curator-client-2.5.0.jar:na]
            at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:807) [curator-framework-2.5.0.jar:na]
            at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:793) [curator-framework-2.5.0.jar:na]
            at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$400(CuratorFrameworkImpl.java:57) [curator-framework-2.5.0.jar:na]
            at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:275) [curator-framework-2.5.0.jar:na]
            at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_67]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
            at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
    [INFO 2016-08-26 10:20:10 s.k.ZkCoordinator:78 kafkaStormSpout:3-MultipleThreadSpoutExecutors] Task [1/1] Refreshing partition manager connections
    [INFO 2016-08-26 10:20:10 s.k.DynamicBrokersReader:83 kafkaStormSpout:3-MultipleThreadSpoutExecutors] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=yz48155.hadoop.data.sina.com.cn:19092,..... 23=yz48160.hadoop.data.sina.com.cn:19092}}
    

    现象:抛出过这个异常就会出现接近20min,不消费数据。导致数据大量延迟。而我自己写的程序消费Kafka延迟较低。不断出现FGC, 5s一次。

    分析:这个异常不是导致Kafka数据不被消费的原因 @fengchao

     

    0x06 JStorm消费Kafka topic出现OOM

    [ERROR 2016-08-25 11:39:39 c.a.j.t.e.s.SpoutExecutors:178 KAFKA_SPOUT:3-MultipleThreadSpoutExecutors] spout execute error 
    java.lang.OutOfMemoryError: PermGen space at
    java.lang.ClassLoader.defineClass1(Native Method) ~[na:1.7.0_67] at
    java.lang.ClassLoader.defineClass(ClassLoader.java:800) ~[na:1.7.0_67]
    ...
    

    Worker配置 worker.memory.size: 419430400

    分析

    1. 查看进程的内存信息:jmap -heap $PID
        Attaching to process ID 2543, please wait...
    Debugger attached successfully.
    Server compiler detected.
    JVM version is 24.65-b04
    
    using parallel threads in the new generation.
    using thread-local object allocation.
    Concurrent Mark-Sweep GC
    
    Heap Configuration:
       MinHeapFreeRatio = 40
       MaxHeapFreeRatio = 70
       MaxHeapSize      = 2147483648 (2048.0MB)
       NewSize          = 209715200 (200.0MB)
       MaxNewSize       = 209715200 (200.0MB)
       OldSize          = 5439488 (5.1875MB)
       NewRatio         = 2
       SurvivorRatio    = 4
       PermSize         = 67108864 (64.0MB)
       MaxPermSize      = 134217728 (128.0MB)
       G1HeapRegionSize = 0 (0.0MB)
    
    Heap Usage:
    New Generation (Eden + 1 Survivor Space):
       capacity = 174784512 (166.6875MB)
       used     = 174769048 (166.6727523803711MB)
       free     = 15464 (0.01474761962890625MB)
       99.99115253415589% used
    Eden Space:
       capacity = 139853824 (133.375MB)
       used     = 139853824 (133.375MB)
       free     = 0 (0.0MB)
       100.0% used
    From Space:
       capacity = 34930688 (33.3125MB)
       used     = 34915224 (33.297752380371094MB)
       free     = 15464 (0.01474761962890625MB)
       99.9557294720333% used
    To Space:
       capacity = 34930688 (33.3125MB)
       used     = 0 (0.0MB)
       free     = 34930688 (33.3125MB)
       0.0% used
    concurrent mark-sweep generation:
       capacity = 1937768448 (1848.0MB)
       used     = 1937768408 (1847.9999618530273MB)
       free     = 40 (3.814697265625E-5MB)
       99.99999793576988% used
    Perm Generation:
       capacity = 67108864 (64.0MB)
       used     = 30199864 (28.80083465576172MB)
       free     = 36909000 (35.19916534423828MB)
       45.001304149627686% used
    
    7935 interned Strings occupying 854144 bytes.
    

    上面的信息是修改了 storm.yaml 的参数 worker.childopts之后,打印的信息。

    worker.childopts: "-Xms1g -Xmx1g -Xmn372m -XX:PermSize=64M -XX:MaxPermSize=64M -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=8 -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction=5 -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=85"
    
    

    产生的原因是 Perm Generation区原来默认是24M,启动的时候,Perm Generation达到了99.9%,yingyong不正常,不产生任何数据。解决办法是加大Perm区。

    0x07 Snappy-java fails on Mac OS JDK 1.7

    自己封装的kafka consumer在mac本地运行报错,导致无法消费到数据,抛出一次异常。

    解决:降低jdk到1.6

     

    0x08 topic 延迟时间

    EA_EXPOSURE:1000001

    30s1min2min3min
    507758 25978 0 0

     

    0x09 kafka topic 估算日志的大小

    • 找到topic partition所在的机器一台
    • ls /data0/kafka/data* 找到所查询的topic,对一个partition进行计算,估算全量的topic的量

     

    0x1A kafka 消费topic过多,出口流量占用太多,导致kafka proxy无法正常使用

    分析
    如何从consumer group寻找ols程序,进而找到相关的负责人,通知他们整改。

     

    0x1B Druid出现某段时间无法消费topic:wb_ad_druid_analysis,consumer group id:druid-2.

    2016-07-21T12:48:02,533 WARN [druid-2_yz2138.hadoop.data.sina.com.cn-1465730148608-f3c110a0-leader-finder-thread] kafka.client.ClientUtils$ - Fetching topic metadata with correlation id 5439 for topics [Set(wb_ad_druid_analysis)] from broker [id:48152,host:yz48152.hadoop.data.sina.com.cn,port:19092] failed
    java.lang.ArrayIndexOutOfBoundsException: 13
            at kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38) ~[kafka_2.10-0.8.2.1.jar:?]
            at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) ~[scala-library-2.10.4.jar:?]
            at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36) ~[kafka_2.10-0.8.2.1.jar:?]
            at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) ~[kafka_2.10-0.8.2.1.jar:?]
            at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) ~[kafka_2.10-0.8.2.1.jar:?]
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:?]
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:?]
            at scala.collection.immutable.Range.foreach(Range.scala:141) ~[scala-library-2.10.4.jar:?]
            at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:?]
            at scala.collection.AbstractTraversable.map(Traversable.scala:105) ~[scala-library-2.10.4.jar:?]
            at kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31) ~[kafka_2.10-0.8.2.1.jar:?]
            at kafka.producer.SyncProducer.send(SyncProducer.scala:114) ~[kafka_2.10-0.8.2.1.jar:?]
            at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) [kafka_2.10-0.8.2.1.jar:?]
            at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) [kafka_2.10-0.8.2.1.jar:?]
            at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) [kafka_2.10-0.8.2.1.jar:?]
            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [kafka_2.10-0.8.2.1.jar:?]
    

    分析

    我们kafka的集群版本是kafka-0.8.0-beta1版本,druid目前使用的kafka_2.10-0.8.2.1.jar版本不一致,请更换对于的client版本。

     

    0x1C OLS 程序消费Topic,效率低

    process方法里,有string.match方法。
    它本质上是调用正则表达式的方法,compile十分耗时,应该分离出process方法中。

    Jstack 着重看RUNNABLE线程。

    • Optitions: -l long listing. Prints additional information about locks. eg: jstack -l $pid

     

    0x1D Kafka被消费的Topic的Consumer Instance Owner is None,Rebalence 失败

    1.现象
    topic weibo_common_act2被consumer clientSearchBhvGp消费。

    2016-06-23 15:52:31,473 ERROR kafka.consumer.ZookeeperConsumerConnector: [clientSearchBhvGp_yz4834.hadoop.data.sina.com.cn-1466668272656-90a8bbdc], error during syncedRebalance
    kafka.common.ConsumerRebalanceFailedException: clientSearchBhvGp_yz4834.hadoop.data.sina.com.cn-1466668272656-90a8bbdc can't rebalance after 4 retries
            at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:397)
            at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:326)
    

    4次Rebalance失败后,这个进程占用了6个Partition,导致这6个Partition无法被消费。

    jstack信息如下,在等待一把lock,但是没有死锁。等着被分配partition消费。

    "in1 Fetch thread" daemon prio=10 tid=0x00007f564c866800 nid=0xbe85 waiting on condition [0x00007f5641015000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x00000000b1fb92f0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
            at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374)
            at com.sina.ols.apu.connector.impl.kafka.KafkaInConnector.fetch(KafkaInConnector.java:107)
            at com.sina.ols.apu.connector.AbstractInConnector$Fetch.run(AbstractInConnector.java:121)
            at java.lang.Thread.run(Thread.java:745)
    
       Locked ownable synchronizers:
            - None
    
    "pool-3-thread-6" prio=10 tid=0x00007f564c865000 nid=0xbe84 waiting on condition [0x00007f5641116000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x00000000b5d4f138> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
            at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
            at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
            at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
            at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61)
            at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53)
            at com.sina.ols.apu.connector.impl.kafka.KafkaInConnector$KafkaConsumerWorker.run(KafkaInConnector.java:136)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
            at java.util.concurrent.FutureTask.run(FutureTask.java:262)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            at java.lang.Thread.run(Thread.java:745)
    
    

    netstat -nalp|grep 48326 输出,表明和6个broker建立了连接。

    tcp        0      0 ::ffff:10.39.48.34:36474    ::ffff:10.39.4.203:19092    ESTABLISHED 48326/java
    tcp        0      0 ::ffff:10.39.48.34:43536    ::ffff:10.39.4.208:19092    ESTABLISHED 48326/java
    tcp        0      0 ::ffff:10.39.48.34:50777    ::ffff:10.39.4.211:19092    ESTABLISHED 48326/java
    tcp        0      0 ::ffff:10.39.48.34:50027    ::ffff:10.39.4.207:19092    ESTABLISHED 48326/java
    tcp        0      0 ::ffff:10.39.48.34:48512    ::ffff:10.39.1.69:22181     ESTABLISHED 48326/java
    tcp        0      0 ::ffff:10.39.48.34:58868    ::ffff:10.39.48.34:34070    ESTABLISHED 48326/java
    tcp        0      0 ::ffff:10.39.48.34:41300    ::ffff:10.39.4.202:19092    ESTABLISHED 48326/java
    tcp        0      0 ::ffff:10.39.48.34:37169    ::ffff:10.39.4.206:19092    ESTABLISHED 48326/java
    

    2.分析

    rebalance 重试的sleep时间:kafka/consumer/ZookeeperConsumerConnector.scala:393

    • "rebalance.backoff.ms","zookeeper.sync.time.ms", 2000

    rebalance 重试次数超过4次,syncedRebalance抛出的是RuntimeException,在下面的代码过程中,将这个异常捕获了,只记录这儿ERROR。

    • kafka/consumer/ZookeeperConsumerConnector.scala:328,正确的做法是捕获到RunTimeException异常,通过exit(-1)让JVM这个进程退出。对于OLS程序会让它,重启一个Container继续运行。

    3.解决

    • 加大重试时间:"rebalance.backoff.ms=5000"
    • 加大retry: "rebalance.max.retries=10"
    • 捕获"ConsumerRebalanceFailedException",退出程序。

    4.OLS程序修改的方式

    用户修改程序的2个步骤

    1. 修改pom.xml的OLS_Yarn依赖为 0.2.2.2
      <dependency>
          <groupId>com.sina.mis</groupId>
          <artifactId>OLS_Yarn</artifactId>
          <version>0.2.2.2</version>
      </dependency>
      
    2. 提交的workflow.xml在添加
      ols.kafka.property.rebalance.backoff.ms=5000,ols.kafka.property.rebalance.max.retries=10

     

    0x1E Storm消费kafka在/consumers/onlineGroupId_rtups/owners/clickstream/节点经常丢失后重建

    分析

    storm集群本身负载很高,导致与zookeeper的连接超时,加大zookeeper.session.time.out,可以缓解这个问题,但是没有根本解决。

    奇怪的地方:设置zookeeper.session.time.out=30时,zk的节点的丢失和重建时间9s、24s、43s等。原因待查 TODO 20116-8-12

     
  • 相关阅读:
    python排序
    JavaMail转发邮件
    Java发送邮件Demo
    字符编码
    常用的python内建函数
    mysql-python安装
    ubuntu安装flash
    grep简介
    【java中的static关键字】
    【java中的final关键字】
  • 原文地址:https://www.cnblogs.com/JetpropelledSnake/p/14179448.html
Copyright © 2011-2022 走看看