zoukankan      html  css  js  c++  java
  • 电信客服_Kafka消费者写入HBase

    流程

    • kafka配置
    • 创建消费者
    • 关注主题ct
    • 获取数据
    • 将数据写入HBase
    consumer.properties是kafka集群的配置信息,calllog是数据封装对象。
    package com.csw.ct.consumer.bean;
    
    import com.csw.ct.common.bean.Consumer;
    import com.csw.ct.common.constant.Names;
    import com.csw.ct.consumer.dao.HBaseDao;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * 通话日志消费者对象
     */
    public class CalllogConsumer implements Consumer {
        /**
         * 消费数据
         */
        public void consume() {
    
            try {
                //创建配置对象
                Properties prop = new Properties ();
                prop.load (Thread.currentThread ().getContextClassLoader ().getResourceAsStream ( "consumer.properties" ));
    
                //获取flume采集的数据
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String> (prop);
    
                //关注主题
                consumer.subscribe( Arrays.asList( Names.TOPIC.getValue ()));
    
                //HBase数据访问对象
                HBaseDao dao = new HBaseDao ();
                //初始化
                dao.init();
    
                //消费数据
                while (true) {
                    ConsumerRecords<String, String> consumerRecords = consumer.poll ( 100 );
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                        System.out.println (consumerRecord.value());
                        //插入数据
                        //dao.insertDate(consumerRecord.value());
                        Calllog log = new Calllog(consumerRecord.value());
    
                        dao.insertDate (log);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace ();
            }
    
        }
    
    
        public void close() throws IOException {
    
        }
    }
    
    

    写入HBase具体代码
    https://www.cnblogs.com/chenshaowei/p/12736522.html

  • 相关阅读:
    实验一 总结
    C#中将JObject类型数据转换成这样的货币数字-带千分符-四舍五入2位小数
    Git常用命令+报错solution
    Python Requests学习笔记
    Python requests 环境搭建
    关于Page Object个人结合测试框架的一些理解
    REST理解 + API测试学习笔记
    记录组内做API测试的流程
    理解c#的Get Set访问器及测试脚本中的应用
    tp3
  • 原文地址:https://www.cnblogs.com/chenshaowei/p/12736462.html
Copyright © 2011-2022 走看看