zoukankan      html  css  js  c++  java
  • kafka cusumer --java(no springboot)

    依赖:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.1.1</version>
            </dependency>
          

    consumer.properties

    bootstrap.servers=hadoop001:9092,hadoop002:9092,hadoop003:9092
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    group.id=com.sea
    enable.auto.commit=true
    auto.commit.interval.ms=1000
        /**
         * 消费数据
         */
        public void consume() {
    
            try {
    
    
                // 创建配置对象
                Properties prop = new Properties();
                prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));
    
                // 获取flume采集的数据
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
    
                // 关注主题
                consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));
    
                // Hbase数据访问对象
    //            HBaseDao dao = new HBaseDao();
                // 初始化
    //            dao.init();
    
                // 消费数据
                while ( true ) {
                    ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                        System.out.println(consumerRecord.value());
                        System.err.println("#########################");
    // 插入数据
    //                    dao.insertData(consumerRecord.value());
                        //Calllog log = new Calllog(consumerRecord.value());
                        //dao.insertData(log);
                    }
                }
            } catch ( Exception e ) {
                e.printStackTrace();
            }
    
        }
  • 相关阅读:
    VMware Workstation 6.0 正式版公布
    KMyMoney:全体理财好管家
    Bugzilla 3.0 公布
    Brightside:切换工作区的小东西
    QTM-Blogging 客户端
    MDF2ISO-将 MDF 转换为 ISO
    Yakuake 2.8 beta1
    Red Hat 的 Liberation 字体
    Dictman:有效的词典呆板人
    digiKam 0.9.2 Beta 1
  • 原文地址:https://www.cnblogs.com/lshan/p/12097576.html
Copyright © 2011-2022 走看看