zoukankan      html  css  js  c++  java
  • kafka cusumer --java(no springboot)

    依赖:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.1.1</version>
            </dependency>
          

    consumer.properties

    bootstrap.servers=hadoop001:9092,hadoop002:9092,hadoop003:9092
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    group.id=com.sea
    enable.auto.commit=true
    auto.commit.interval.ms=1000
        /**
         * 消费数据
         */
        public void consume() {
    
            try {
    
    
                // 创建配置对象
                Properties prop = new Properties();
                prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));
    
                // 获取flume采集的数据
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
    
                // 关注主题
                consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));
    
                // Hbase数据访问对象
    //            HBaseDao dao = new HBaseDao();
                // 初始化
    //            dao.init();
    
                // 消费数据
                while ( true ) {
                    ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                        System.out.println(consumerRecord.value());
                        System.err.println("#########################");
    // 插入数据
    //                    dao.insertData(consumerRecord.value());
                        //Calllog log = new Calllog(consumerRecord.value());
                        //dao.insertData(log);
                    }
                }
            } catch ( Exception e ) {
                e.printStackTrace();
            }
    
        }
  • 相关阅读:
    opencv for java via cmake-gui
    ubuntu的设置里少了好多设置
    vue组件化编程应用2
    vue组件化编程应用
    vue组件化编程
    es6基本介绍及使用
    webpack基本介绍及使用
    npm基本介绍及使用
    node基本介绍及使用
    前后端分离基本介绍
  • 原文地址:https://www.cnblogs.com/lshan/p/12097576.html
Copyright © 2011-2022 走看看