zoukankan      html  css  js  c++  java
  • 电信客服_Kafka消费者写入HBase

    流程

    • kafka配置
    • 创建消费者
    • 关注主题ct
    • 获取数据
    • 将数据写入HBase
    consumer.properties是kafka集群的配置信息,calllog是数据封装对象。
    package com.csw.ct.consumer.bean;
    
    import com.csw.ct.common.bean.Consumer;
    import com.csw.ct.common.constant.Names;
    import com.csw.ct.consumer.dao.HBaseDao;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * 通话日志消费者对象
     */
    public class CalllogConsumer implements Consumer {
        /**
         * 消费数据
         */
        public void consume() {
    
            try {
                //创建配置对象
                Properties prop = new Properties ();
                prop.load (Thread.currentThread ().getContextClassLoader ().getResourceAsStream ( "consumer.properties" ));
    
                //获取flume采集的数据
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String> (prop);
    
                //关注主题
                consumer.subscribe( Arrays.asList( Names.TOPIC.getValue ()));
    
                //HBase数据访问对象
                HBaseDao dao = new HBaseDao ();
                //初始化
                dao.init();
    
                //消费数据
                while (true) {
                    ConsumerRecords<String, String> consumerRecords = consumer.poll ( 100 );
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                        System.out.println (consumerRecord.value());
                        //插入数据
                        //dao.insertDate(consumerRecord.value());
                        Calllog log = new Calllog(consumerRecord.value());
    
                        dao.insertDate (log);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace ();
            }
    
        }
    
    
        public void close() throws IOException {
    
        }
    }
    
    

    写入HBase具体代码
    https://www.cnblogs.com/chenshaowei/p/12736522.html

  • 相关阅读:
    微信小程序动态更改样式
    ionic toggle点击返回true/false支持自定义
    ionic 页面传递参数
    ionic 搜索双向数据绑定失效
    关于select的默认样式问题
    nn
    MVC api json 格式
    iis 500 解决方法
    关于qquu8 的主页修改
    CentOS6.5下MAC
  • 原文地址:https://www.cnblogs.com/chenshaowei/p/12736462.html
Copyright © 2011-2022 走看看