zoukankan      html  css  js  c++  java
  • kafka cusumer --java(no springboot)

    依赖:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.1.1</version>
            </dependency>
          

    consumer.properties

    bootstrap.servers=hadoop001:9092,hadoop002:9092,hadoop003:9092
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    group.id=com.sea
    enable.auto.commit=true
    auto.commit.interval.ms=1000
        /**
         * 消费数据
         */
        public void consume() {
    
            try {
    
    
                // 创建配置对象
                Properties prop = new Properties();
                prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));
    
                // 获取flume采集的数据
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
    
                // 关注主题
                consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));
    
                // Hbase数据访问对象
    //            HBaseDao dao = new HBaseDao();
                // 初始化
    //            dao.init();
    
                // 消费数据
                while ( true ) {
                    ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                        System.out.println(consumerRecord.value());
                        System.err.println("#########################");
    // 插入数据
    //                    dao.insertData(consumerRecord.value());
                        //Calllog log = new Calllog(consumerRecord.value());
                        //dao.insertData(log);
                    }
                }
            } catch ( Exception e ) {
                e.printStackTrace();
            }
    
        }
  • 相关阅读:
    Kibana
    Filebeat使用
    leetcode刷题笔记七十三题 矩阵置零
    leetcode刷题笔记七十二题 编辑距离
    leetcode刷题笔记七十一题 简化路径
    leetcode刷题笔记七十题 爬楼梯
    leetcode刷题笔记六十九题 X的平方根
    python 冒泡算法
    hive 函数
    Task07:类、对象与魔法方法(3天)
  • 原文地址:https://www.cnblogs.com/lshan/p/12097576.html
Copyright © 2011-2022 走看看