zoukankan      html  css  js  c++  java
  • kafka cusumer --java(no springboot)

    依赖:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.1.1</version>
            </dependency>
          

    consumer.properties

    bootstrap.servers=hadoop001:9092,hadoop002:9092,hadoop003:9092
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    group.id=com.sea
    enable.auto.commit=true
    auto.commit.interval.ms=1000
        /**
         * 消费数据
         */
        public void consume() {
    
            try {
    
    
                // 创建配置对象
                Properties prop = new Properties();
                prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));
    
                // 获取flume采集的数据
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
    
                // 关注主题
                consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));
    
                // Hbase数据访问对象
    //            HBaseDao dao = new HBaseDao();
                // 初始化
    //            dao.init();
    
                // 消费数据
                while ( true ) {
                    ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                        System.out.println(consumerRecord.value());
                        System.err.println("#########################");
    // 插入数据
    //                    dao.insertData(consumerRecord.value());
                        //Calllog log = new Calllog(consumerRecord.value());
                        //dao.insertData(log);
                    }
                }
            } catch ( Exception e ) {
                e.printStackTrace();
            }
    
        }
  • 相关阅读:
    OnSize() 与 OnInitDialog()[设置控件大小]
    C库函数中字符串处理函数集合
    智能提示导致Visual Studio 2010崩溃问题
    MFC中关闭窗口的几种方法
    8086寄存器组
    MASM6.1使用方法(适合初学者)
    MultiThread
    汇编语言超浓缩教程
    汇编 ADD与DAA指令
    Function Pointer
  • 原文地址:https://www.cnblogs.com/lshan/p/12097576.html
Copyright © 2011-2022 走看看