zoukankan      html  css  js  c++  java
  • kafka数据定时导入hive便于后续做数据清洗

    问题背景

    kafka数据定时导入到hive,后续做数据清洗:
    flume,confulent都需要单独部署服务,比较繁琐。调查其他可选方案,参考以下文章:参考资料
    综合比较,camus 简单,比较方便接入。主要分两步:
    1、采用mapreduce过程处理数据从kafka导入hadoop
    2、hadoop数据接入hive管理。

    解决过程

    1、下载源码,本地构建jar包。
    参考文章
    camus源码
    2、查看camus.properties配置文件,支持的功能选项
    期间需要自定义input,output encoder,
    需要配置Reader,Writer类,具体参考源码实现。
    3、修改camus.properties配置项,最终结果如下:

    camus.job.name=Camus-Job-Test
    etl.destination.path=/tmp/escheduler/root/resources/topics
    etl.execution.base.path=/tmp/escheduler/root/resources/camus/exec
    etl.execution.history.path=/tmp/escheduler/root/resources/camus/exec/history
    # 新增的自定义decoder
    camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.StringMessageDecoder
    # 修改写入hadoop的writer
    etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider
    kafka.client.name=camus
    # broker
    kafka.brokers=....
    # topic
    kafka.whitelist.topics=topic_exec_servicecheck_prod_calConfigId_177
    log4j.configuration=false
    # 禁用压缩,deflate,snappy
    mapred.output.compress=false
    etl.output.codec=deflate
    etl.deflate.level=6
    etl.default.timezone=Asia/Shanghai
    

    4、上传jar,properties文件,执行如下命令:实现kafka数据到hadoop的功能:

    cd /home/app/transform/libs/
    hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P camus.properties
    

    数据导入到hadoop,
    5、数据从hadoop到hive,执行如下脚本:

    date_string=$(date '+%Y/%m/%d/%H') 
    partion=$(date '+%Y-%m-%d_%H')
    topic='topic_exec_servicecheck_prod_calConfigId_177'
    table_name='dwd.test_exec_servicecheck'
    filePath="/tmp/escheduler/root/resources/topics/$topic/hourly/"$date_string"/"
    hive<<EOF
    create table if not exists $table_name(
       date TIMESTAMP,
       node STRING,
       status STRING
    )
    PARTITIONED BY(dt STRING)
    row format delimited 
    fields terminated by '|'  
    STORED AS TEXTFILE;
    load data inpath '$filePath' into table $table_name partition (dt='$partion');
    EOF
    

    6、配置定时调度,按小时执行。

    注意事项

    附自定义decoder

    public class StringMessageDecoder  extends MessageDecoder<Message, String> {
    	private static final org.apache.log4j.Logger log = Logger.getLogger(JsonStringMessageDecoder.class);
    
    	@Override
    	public CamusWrapper<String> decode(Message message) {
    		//log.info(message.getTopic());
    		return new CamusWrapper<String>(new String(message.getPayload()));
    	}
    }
    
    

    hive input/output 支持自定义数据格式,这个是很有意义的,通常来说文本文件,分隔符分割一行,纯文本解析,最简单,但是可读性,可维护性差。
    支持json格式数据写入,json处理相关jar文件 放到${HIVE_HOME}/lib目录。

  • 相关阅读:
    第四次作业—四则运算
    第四次作业—代码规范
    【欢迎来怼】事后诸葛亮会议
    软件工程——第七次作业
    第17次Scrum会议(10/29)【欢迎来怼】
    软件工程——第六次作业
    第10次Scrum会议(10/22)【欢迎来怼】
    软件工程——第五次作业
    欢迎来怼——第四次Scrum会议
    软件工程——第四次作业(3)
  • 原文地址:https://www.cnblogs.com/coding-now/p/14660571.html
Copyright © 2011-2022 走看看