zoukankan      html  css  js  c++  java
  • kafka java api消费者

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;

    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;

    public class KafkaConsumer {

    private final ConsumerConnector consumer;

    private KafkaConsumer() {
    Properties props = new Properties();

    // zookeeper 配置
    props.put("zookeeper.connect", "192.168.170.185:2181");

    // 消费者所在组
    props.put("group.id", "testgroup");

    // zk连接超时
    props.put("zookeeper.session.timeout.ms", "4000");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "smallest");

    // 序列化类
    props.put("serializer.class", "kafka.serializer.StringEncoder");

    ConsumerConfig config = new ConsumerConfig(props);

    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }

    void consume() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));

    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
    StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

    Map<String, List<KafkaStream<String, String>>> consumerMap =
    consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
    KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);
    ConsumerIterator<String, String> it = stream.iterator();

    int messageCount = 0;
    while (it.hasNext()){
    System.out.println(it.next().message());
    messageCount++;
    if(messageCount == 100){
    System.out.println("Consumer端一共消费了" + messageCount + "条消息!");
    }
    }
    }

    public static void main(String[] args) {
    new KafkaConsumer().consume();
    }
    }

  • 相关阅读:
    Vue路由机制
    谷歌浏览器打不开应用商店的解决方法
    Vue报错——Component template should contain exactly one root element. If you are using vif on multiple elements, use velseif to chain them instead.
    Vue.js学习之——安装
    Vue使用axios无法读取data的解决办法
    关于localstorage存储JSON对象的问题
    2013年整体计划
    个人喜欢的警语收集
    Linux防火墙的关闭和开启
    Flex修改title 转载
  • 原文地址:https://www.cnblogs.com/wangjing666/p/6860751.html
Copyright © 2011-2022 走看看