zoukankan      html  css  js  c++  java
  • Kafka的生产者和消费者代码解析

    1:Kafka名词解释和工作方式
        1.1:Producer :消息生产者,就是向kafka broker发消息的客户端。
        1.2:Consumer :消息消费者,向kafka broker取消息的客户端
        1.3:Topic :可以理解为一个队列。
        1.4:Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
        1.5:Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
        1.6:Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
        1.7:Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

    2:Consumer与topic关系?本质上kafka只支持Topic。
      2.1:每个group中可以有多个consumer,每个consumer属于一个consumer group;
    通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高"故障容错"性,如果group中的某个consumer失效那么其消费的partitions将会有其他consumer自动接管。
    2.2:对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的其中一个consumer消费,此消息不会发送给一个group的多个consumer;
    那么一个group中所有的consumer将会交错的消费整个Topic,每个group中consumer消息消费互相独立,我们可以认为一个group是一个"订阅"者。
    2.3:在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);
    一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息。
    2.4:kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。
      2.5:kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的。

    3:Kafka消息的分发,Producer客户端负责消息的分发。
      3.1:kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息;
    3.2:当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;
    3.3:消息由producer直接通过socket发送到broker,中间不会经过任何"路由层",事实上,消息被路由到哪个partition上由producer客户端决定;
          比如可以采用"random""key-hash""轮询"等,如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的。
    3.4:在producer端的配置文件中,开发者可以指定partition路由的方式。
      3.5:Producer消息发送的应答机制:
        设置发送数据是否需要服务端的反馈,三个值0,1,-1。
        0: producer不会等待broker发送ack。
        1: 当leader接收到消息之后发送ack。
        -1: 当所有的follower都同步消息成功后发送ack。
            request.required.acks=0。

    4:Consumer的负载均衡:
      当一个group中,有consumer加入或者离开时,会触发partitions均衡.均衡的最终目的,是提升topic的并发消费能力:

     步骤如下:
    a、假如topic1,具有如下partitions: P0,P1,P2,P3。
    b、加入group中,有如下consumer: C1,C2。
    c、首先根据partition索引号对partitions排序: P0,P1,P2,P3。
    d、根据consumer.id排序: C0,C1。
    e、计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)。
    f、然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]。

    6:Kafka文件存储基本结构:

      6.1:在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
      6.2:每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。默认保留7天的数据。

      6.3:每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。(什么时候创建,什么时候删除)。


    1:使用Idea进行开发,源码如下所示,首先加入Kafka必须依赖的包,这句话意味着你必须要先在Idea上面搭建好的你的maven环境:

    pom.xml如下所示内容:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <modelVersion>4.0.0</modelVersion>
     6 
     7     <groupId>com.bie</groupId>
     8     <artifactId>storm</artifactId>
     9     <version>1.0-SNAPSHOT</version>
    10 
    11     <!-- storm的依赖关系 -->
    12     <dependencies>
    13         <!--storm依赖的包-->
    14         <dependency>
    15             <groupId>org.apache.storm</groupId>
    16             <artifactId>storm-core</artifactId>
    17             <version>0.9.5</version>
    18             <!--<scope>provided</scope>-->
    19         </dependency>
    20         <!-- kafka依赖的包-->
    21         <dependency>
    22             <groupId>org.apache.kafka</groupId>
    23             <artifactId>kafka_2.8.2</artifactId>
    24             <version>0.8.1</version>
    25             <exclusions>
    26                 <exclusion>
    27                     <artifactId>jmxtools</artifactId>
    28                     <groupId>com.sun.jdmk</groupId>
    29                 </exclusion>
    30                 <exclusion>
    31                     <artifactId>jmxri</artifactId>
    32                     <groupId>com.sun.jmx</groupId>
    33                 </exclusion>
    34                 <exclusion>
    35                     <artifactId>jms</artifactId>
    36                     <groupId>javax.jms</groupId>
    37                 </exclusion>
    38                 <exclusion>
    39                     <groupId>org.apache.zookeeper</groupId>
    40                     <artifactId>zookeeper</artifactId>
    41                 </exclusion>
    42                 <exclusion>
    43                     <groupId>org.slf4j</groupId>
    44                     <artifactId>slf4j-log4j12</artifactId>
    45                 </exclusion>
    46                 <exclusion>
    47                     <groupId>org.slf4j</groupId>
    48                     <artifactId>slf4j-api</artifactId>
    49                 </exclusion>
    50             </exclusions>
    51         </dependency>
    52     </dependencies>
    53 
    54     <!--如果依赖外部包,就打不进去外部包,所以需要引入下面所示-->
    55     <build>
    56         <plugins>
    57             <plugin>
    58                 <!--把其他外部依赖的jar包打成一个大jar包-->
    59                 <artifactId>maven-assembly-plugin</artifactId>
    60                 <configuration>
    61                     <descriptorRefs>
    62                         <descriptorRef>jar-with-dependencies</descriptorRef>
    63                     </descriptorRefs>
    64                     <archive>
    65                         <manifest>
    66                             <mainClass>com.bie.wordcount.WordCountTopologyMain</mainClass>
    67                         </manifest>
    68                     </archive>
    69                 </configuration>
    70                 <executions>
    71                     <execution>
    72                         <id>make-assembly</id>
    73                         <phase>package</phase>
    74                         <goals>
    75                             <goal>single</goal>
    76                         </goals>
    77                     </execution>
    78                 </executions>
    79             </plugin>
    80             <plugin>
    81                 <groupId>org.apache.maven.plugins</groupId>
    82                 <artifactId>maven-compiler-plugin</artifactId>
    83                 <configuration>
    84                     <source>1.7</source>
    85                     <target>1.7</target>
    86                 </configuration>
    87             </plugin>
    88         </plugins>
    89     </build>
    90 
    91 
    92 </project>

    然后呢,书写你的生产者源码,如下所示:

    package com.bie.kafka;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    import java.util.Properties;
    import java.util.UUID;
    
    /**
     * 这是一个简单的Kafka producer代码
     * 包含两个功能:
     * 1、数据发送
     * 2、数据按照自定义的partition策略进行发送
     *
     *
     * KafkaSpout的类
     */
    public class KafkaProducerSimple {
    
    
        public static void main(String[] args) {
            /**
             * 1、指定当前kafka producer生产的数据的目的地
             *  创建topic可以输入以下命令,在kafka集群的任一节点进行创建。
             *  bin/kafka-topics.sh --create --zookeeper master:2181
             *  --replication-factor 1 --partitions 1 --topic orderMq
             */
            String TOPIC = "orderMq8";
            /**
             * 2、读取配置文件
             */
            Properties props = new Properties();
            /*
             * key.serializer.class默认为serializer.class
             */
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            /*
             * kafka broker对应的主机,格式为host1:port1,host2:port2
             */
            props.put("metadata.broker.list", "master:9092,slaver1:9092,slaver2:9092");
            /*
             * request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
             * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
             * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
             * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
             * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
             * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
             * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
             * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
             */
            props.put("request.required.acks", "1");
            /*
             * 可选配置,如果不配置,则使用默认的partitioner partitioner.class
             * 默认值:kafka.producer.DefaultPartitioner
             * 用来把消息分到各个partition中,默认行为是对key进行hash。
             */
            props.put("partitioner.class", "com.bie.kafka.MyLogPartitioner");
            //props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
            /**
             * 3、通过配置文件,创建生产者
             */
            Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
            /**
             * 4、通过for循环生产数据
             */
            for (int messageNo = 1; messageNo < 100000; messageNo++) {
                String messageStr = new String(messageNo + "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey," +
                        "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
                        "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
                        "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
                        "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
                        "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
                        "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
                        "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
                        "用来配合自定义的MyLogPartitioner进行数据分发");
    
                /**
                 * 5、调用producer的send方法发送数据
                 * 注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发
                 */
                producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + messageStr));
    
                //producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "biexiansheng"));
            }
        }
    }

    生产者需要的Partitioner如下所示内容:

    package com.bie.kafka;
    
    import kafka.producer.Partitioner;
    import kafka.utils.VerifiableProperties;
    import org.apache.log4j.Logger;
    
    
    public class MyLogPartitioner implements Partitioner {
        private static Logger logger = Logger.getLogger(MyLogPartitioner.class);
    
        public MyLogPartitioner(VerifiableProperties props) {
        }
    
        public int partition(Object obj, int numPartitions) {
            return Integer.parseInt(obj.toString())%numPartitions;
    //        return 1;
        }
    
    }

    生产者运行效果如下所示:

    消费者代码如下所示:

    package com.bie.kafka;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class KafkaConsumerSimple implements Runnable {
        public String title;
        public KafkaStream<byte[], byte[]> stream;
        public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {
            this.title = title;
            this.stream = stream;
        }
        @Override
        public void run() {
            System.out.println("开始运行 " + title);
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            /**
             * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞
             * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false
             * */
            while (it.hasNext()) {
                MessageAndMetadata<byte[], byte[]> data = it.next();
                Object topic = data.topic();
                int partition = data.partition();
                long offset = data.offset();
                String msg = new String(data.message());
                System.out.println(String.format(
                        "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",
                        title, topic, partition, offset, msg));
            }
            System.out.println(String.format("Consumer: [%s] exiting ...", title));
        }
    
        public static void main(String[] args) throws Exception{
            Properties props = new Properties();
            props.put("group.id", "biexiansheng");
            props.put("zookeeper.connect", "master:2181,slaver1:2181,slaver2:2181");
            props.put("auto.offset.reset", "largest");
            props.put("auto.commit.interval.ms", "1000");
            props.put("partition.assignment.strategy", "roundrobin");
            ConsumerConfig config = new ConsumerConfig(props);
            String topic1 = "orderMq8";
            //String topic2 = "paymentMq";
            //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
            ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
            //定义一个map
            Map<String, Integer> topicCountMap = new HashMap<>();
            topicCountMap.put(topic1, 3);
            //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流
            Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
            //取出 `kafkaTest` 对应的 streams
            List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1);
            //创建一个容量为4的线程池
            ExecutorService executor = Executors.newFixedThreadPool(3);
            //创建20个consumer threads
            for (int i = 0; i < streams.size(); i++) {
                executor.execute(new KafkaConsumerSimple("消费者" + (i + 1), streams.get(i)));
            }
        }
    }

    消费者运行如下所示:

    运行消费者出现下面的错误,解决方法将pomx.ml里面的zookeeper配置注释了即可:

     错误如下所示:

     1 D:softJavajdk1.7.0_80injava -javaagent:E:360Downloadsidealibidea_rt.jar=61635:E:360Downloadsideain -Dfile.encoding=UTF-8 -classpath D:softJavajdk1.7.0_80jrelibcharsets.jar;D:softJavajdk1.7.0_80jrelibdeploy.jar;D:softJavajdk1.7.0_80jrelibextaccess-bridge-64.jar;D:softJavajdk1.7.0_80jrelibextdnsns.jar;D:softJavajdk1.7.0_80jrelibextjaccess.jar;D:softJavajdk1.7.0_80jrelibextlocaledata.jar;D:softJavajdk1.7.0_80jrelibextsunec.jar;D:softJavajdk1.7.0_80jrelibextsunjce_provider.jar;D:softJavajdk1.7.0_80jrelibextsunmscapi.jar;D:softJavajdk1.7.0_80jrelibextzipfs.jar;D:softJavajdk1.7.0_80jrelibjavaws.jar;D:softJavajdk1.7.0_80jrelibjce.jar;D:softJavajdk1.7.0_80jrelibjfr.jar;D:softJavajdk1.7.0_80jrelibjfxrt.jar;D:softJavajdk1.7.0_80jrelibjsse.jar;D:softJavajdk1.7.0_80jrelibmanagement-agent.jar;D:softJavajdk1.7.0_80jrelibplugin.jar;D:softJavajdk1.7.0_80jrelib
    esources.jar;D:softJavajdk1.7.0_80jrelib
    t.jar;E:360Downloadsideastorm	argetclasses;E:maven
    epositoryorgapachestormstorm-core0.9.5storm-core-0.9.5.jar;E:maven
    epositoryorgclojureclojure1.5.1clojure-1.5.1.jar;E:maven
    epositoryclj-timeclj-time0.4.1clj-time-0.4.1.jar;E:maven
    epositoryjoda-timejoda-time2.0joda-time-2.0.jar;E:maven
    epositorycompojurecompojure1.1.3compojure-1.1.3.jar;E:maven
    epositoryorgclojurecore.incubator0.1.0core.incubator-0.1.0.jar;E:maven
    epositoryorgclojure	ools.macro0.1.0	ools.macro-0.1.0.jar;E:maven
    epositorycloutclout1.0.1clout-1.0.1.jar;E:maven
    epository
    ing
    ing-core1.1.5
    ing-core-1.1.5.jar;E:maven
    epositorycommons-fileuploadcommons-fileupload1.2.1commons-fileupload-1.2.1.jar;E:maven
    epositoryjavaxservletservlet-api2.5servlet-api-2.5.jar;E:maven
    epositoryhiccuphiccup0.3.6hiccup-0.3.6.jar;E:maven
    epository
    ing
    ing-devel0.3.11
    ing-devel-0.3.11.jar;E:maven
    epositoryclj-stacktraceclj-stacktrace0.2.2clj-stacktrace-0.2.2.jar;E:maven
    epository
    ing
    ing-jetty-adapter0.3.11
    ing-jetty-adapter-0.3.11.jar;E:maven
    epository
    ing
    ing-servlet0.3.11
    ing-servlet-0.3.11.jar;E:maven
    epositoryorgmortbayjettyjetty6.1.26jetty-6.1.26.jar;E:maven
    epositoryorgmortbayjettyjetty-util6.1.26jetty-util-6.1.26.jar;E:maven
    epositoryorgclojure	ools.logging0.2.3	ools.logging-0.2.3.jar;E:maven
    epositoryorgclojuremath.numeric-tower0.0.1math.numeric-tower-0.0.1.jar;E:maven
    epositoryorgclojure	ools.cli0.2.4	ools.cli-0.2.4.jar;E:maven
    epositorycommons-iocommons-io2.4commons-io-2.4.jar;E:maven
    epositoryorgapachecommonscommons-exec1.1commons-exec-1.1.jar;E:maven
    epositorycommons-langcommons-lang2.5commons-lang-2.5.jar;E:maven
    epositorycomgooglecodejson-simplejson-simple1.1json-simple-1.1.jar;E:maven
    epositorycom	wittercarbonite1.4.0carbonite-1.4.0.jar;E:maven
    epositorycomesotericsoftwarekryokryo2.21kryo-2.21.jar;E:maven
    epositorycomesotericsoftware
    eflectasm
    eflectasm1.07
    eflectasm-1.07-shaded.jar;E:maven
    epositoryorgow2asmasm4.0asm-4.0.jar;E:maven
    epositorycomesotericsoftwareminlogminlog1.2minlog-1.2.jar;E:maven
    epositoryorgobjenesisobjenesis1.2objenesis-1.2.jar;E:maven
    epositorycom	witterchill-java0.3.5chill-java-0.3.5.jar;E:maven
    epositoryorgyamlsnakeyaml1.11snakeyaml-1.11.jar;E:maven
    epositorycommons-loggingcommons-logging1.1.3commons-logging-1.1.3.jar;E:maven
    epositorycommons-codeccommons-codec1.6commons-codec-1.6.jar;E:maven
    epositorycomgooglecodedisruptordisruptor2.10.1disruptor-2.10.1.jar;E:maven
    epositoryorgjgraphtjgrapht-core0.9.0jgrapht-core-0.9.0.jar;E:maven
    epositorychqoslogbacklogback-classic1.0.13logback-classic-1.0.13.jar;E:maven
    epositorychqoslogbacklogback-core1.0.13logback-core-1.0.13.jar;E:maven
    epositoryorgslf4jslf4j-api1.7.5slf4j-api-1.7.5.jar;E:maven
    epositoryorgslf4jlog4j-over-slf4j1.6.6log4j-over-slf4j-1.6.6.jar;E:maven
    epositoryjlinejline2.11jline-2.11.jar;E:maven
    epositoryorgapachekafkakafka_2.8.20.8.1kafka_2.8.2-0.8.1.jar;E:maven
    epositoryorgscala-langscala-library2.8.2scala-library-2.8.2.jar;E:maven
    epositorycomyammermetricsmetrics-annotation2.2.0metrics-annotation-2.2.0.jar;E:maven
    epositorycomyammermetricsmetrics-core2.2.0metrics-core-2.2.0.jar;E:maven
    epositoryorgxerialsnappysnappy-java1.0.5snappy-java-1.0.5.jar;E:maven
    epository
    etsfjopt-simplejopt-simple3.2jopt-simple-3.2.jar;E:maven
    epositorycom101teczkclient0.3zkclient-0.3.jar;E:maven
    epositorylog4jlog4j1.2.14log4j-1.2.14.jar com.bie.kafka.KafkaConsumerSimple
     2 260  [main] INFO  kafka.utils.VerifiableProperties - Verifying properties
     3 311  [main] INFO  kafka.utils.VerifiableProperties - Property auto.commit.interval.ms is overridden to 1000
     4 311  [main] INFO  kafka.utils.VerifiableProperties - Property auto.offset.reset is overridden to largest
     5 311  [main] INFO  kafka.utils.VerifiableProperties - Property group.id is overridden to biexiansheng
     6 312  [main] WARN  kafka.utils.VerifiableProperties - Property partition.assignment.strategy is not valid
     7 312  [main] INFO  kafka.utils.VerifiableProperties - Property zookeeper.connect is overridden to master:2181,slaver1:2181,slaver2:2181
     8 448  [main] INFO  kafka.consumer.ZookeeperConsumerConnector - [biexiansheng_HY-201707051724-1516692275031-bffb9bfb], Connecting to zookeeper instance at master:2181,slaver1:2181,slaver2:2181
     9 Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/zookeeper/Watcher
    10     at java.lang.ClassLoader.defineClass1(Native Method)
    11     at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
    12     at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    13     at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
    14     at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
    15     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    16     at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    17     at java.security.AccessController.doPrivileged(Native Method)
    18     at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    19     at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    20     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    21     at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    22     at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:156)
    23     at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:114)
    24     at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:65)
    25     at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:67)
    26     at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
    27     at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
    28     at com.bie.kafka.KafkaConsumerSimple.main(KafkaConsumerSimple.java:58)
    29 Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.Watcher
    30     at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    31     at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    32     at java.security.AccessController.doPrivileged(Native Method)
    33     at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    34     at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    35     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    36     at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    37     ... 19 more
    38 
    39 Process finished with exit code 1

    运行效果如下所示:

     

    待续......

  • 相关阅读:
    CentOS 163 yum 源使用
    RedHat Enterprise Linux 5 安装TFTP服务器和NFS服务器
    Hadoop压缩SNAPPY算法安装
    linux 修改hostname 方法小结
    svnserver配置文件详解
    RedHat启动时sendmail启动慢的解决
    HBase 系统架构
    CentOS 域名解析 配置问题
    yum 安装 gcc
    Missing indirectly referenced artifact com.sun:tools:jar:1.5.0:system 错误
  • 原文地址:https://www.cnblogs.com/biehongli/p/8335538.html
Copyright © 2011-2022 走看看