zoukankan      html  css  js  c++  java
  • kafka 安装以及测试

     1,下载kafka 并进行解压 http://mirrors.cnnic.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz

     2,启动Zookeeper  bin/zookeeper-server-start.sh config/zookeeper.properties &

     3,启动kafka   bin/kafka-server-start.sh config/server.properties 

     4,创建 topic  bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testweixuan &

     5,  查看创建的topic  bin/kafka-topics.sh --list --zookeeper localhost:2181

          查看topic的详细信息  bin/kafka-topics.sh --describe --zookeeper localhost:2181

    解决报错:

    1. kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

    需要改动config文件夹下的server.properties中的以下两个属性

    zookeeper.connect=localhost:2181改成zookeeper.connect=192.168.1.116 (自己的服务器IP地址):2181
    

    以及默认注释掉的 #host.name=localhost 改成 host.name=192.168.1.116 (自己的服务器IP地址)

    java api 测试kafka

    消费端:

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;
    
    public class KafkaConsumer {
    
        private final ConsumerConnector consumer;
        public final static String TOPIC = "kafkaToptic2";
        private KafkaConsumer() {
            Properties props = new Properties();
            //zookeeper 配置
            props.put("zookeeper.connect", "ip:2181");
            //group 代表一个消费组
            props.put("group.id", "jd-group");
            //zk连接超时
            props.put("zookeeper.session.timeout.ms", "4000");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            props.put("auto.offset.reset", "smallest");
            //序列化类
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            ConsumerConfig config = new ConsumerConfig(props);
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
        }
    
        void consume() {
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(TOPIC, new Integer(1));
    
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
            Map<String, List<KafkaStream<String, String>>> consumerMap = 
                    consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
            KafkaStream<String, String> stream = consumerMap.get(TOPIC).get(0);
            ConsumerIterator<String, String> it = stream.iterator();
            while (it.hasNext())
                System.out.println(it.next().message());
        }
    
        public static void main(String[] args) {
            new KafkaConsumer().consume();
        }
    }

    生产端

    import java.util.Properties;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    /**
     * Hello world!
     *
     */
    public class KafkaProducer 
    {
        private final Producer<String, String> producer;
        public final static String TOPIC = "TEST-TOPIC";
    
        private KafkaProducer(){
            Properties props = new Properties();
            //此处配置的是kafka的端口
            props.put("metadata.broker.list", "ip:9092");
            props.put("zookeeper.connect", "ip:2181");//声明zk
    //        props.put("metadata.broker.list", "localhsot:9092");
    //        props.put("zookeeper.connect", "localhsot:2181");//声明zk
            //配置value的序列化类
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            //配置key的序列化类
            props.put("key.serializer.class", "kafka.serializer.StringEncoder");
    
            //request.required.acks
            //0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
            //1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
            //-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
    //        props.put("request.required.acks","-1");
            producer = new Producer<String, String>(new ProducerConfig(props));
        }
    
        void produce() {
            int messageNo = 1000;
            final int COUNT = 10000;
            while (messageNo < COUNT) {
                String key = String.valueOf(messageNo);
                String data = "hello kafka message " + key;
                producer.send(new KeyedMessage<String, String>(TOPIC, key ,data));
                System.out.println(data);
                messageNo ++;
            }
        }
    
        public static void main( String[] args )
        {
            new KafkaProducer().produce();
        }
    }

    需要修改的文件是 kafka下的config

      1. 修改配置文件config/server.properties  
      2. broker.id=1 #(唯一) 
      3. host.name=ip 
      4. log.dirs=/usr/local/kafka_2.10-0.8.2.0/log
      5. zookeeper.connect=192.168.1.116 (自己的服务器IP地址):2181

     

  • 相关阅读:
    随题而学(二)多维数组转一维数组
    随题而学(一)
    谁能破解“无法定位程序输入点ucrtbase.abort与动态链接库api-ms-win-crt-runtime-l1-1-0.dll上”
    虚拟机8—tools安装失败
    win7介绍
    win xp安装
    Linux正则表达式,grep总结,sed用法
    Linux将用户添加到组的指令
    xxx is not in the sudoers file.This incident will be reported.的解决方法
    69-70连接查询
  • 原文地址:https://www.cnblogs.com/rxingyue/p/kafka.html
Copyright © 2011-2022 走看看