zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之Gobblin(2)持久化kafka到hdfs

    gobblin 0.10

    想要持久化kafka到hdfs有很多种方式,比如flume、logstash、gobblin,其中flume和logstash是流式的,gobblin是批处理式的,gobblin通过定时任务触发来完成数据持久化,在任务和任务之间是没有任何读写的,这点是和flume、logstash的最大不同;

    gobblin有几种部署方式:

    1)standalone+cron;

    2)mr+oozie/azkaban等

    3)docker;

    其中第3中方式最为方便,因为gobblin可以把任务的状态都写到hdfs上,所以在哪个节点启动gobblin并没有什么区别,而且只有数据同步之后才会修改元数据,保证不会因为kafka或者hdfs或者自身故障导致丢数据;

    1 配置

    #job
    job.name=test_job
    job.group=test_group
    job.schedule=0 0 */1 * * ?
    job.lock.enabled=false
    
    #source
    source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
    extract.namespace=gobblin.extract.kafka
    kafka.brokers=$kafka_brokers
    bootstrap.with.offset=latest
    topic.whitelist=$kafka_topics
    
    mr.job.max.mappers=1
    
    #writer
    writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
    writer.file.path.type=tablename
    writer.destination.type=HDFS
    writer.output.format=txt
    writer.partitioner.class=gobblin.writer.partitioner.TimeBasedWriterPartitioner
    writer.partition.columns=time writer.partition.level
    =hourly writer.partition.pattern=yyyyMMdd/HH writer.partition.timezone=Asia/Shanghai data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher #metrics metrics.reporting.file.enabled=true metrics.reporting.file.suffix=txt #fs fs.uri=hdfs://$name_node:8020 writer.fs.uri=${fs.uri} state.store.fs.uri=${fs.uri} data.publisher.final.dir=${env:GOBBLIN_WORK_DIR}/job-output metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics state.store.dir=${env:GOBBLIN_WORK_DIR}/state-store mr.job.root.dir=${env:GOBBLIN_WORK_DIR}/working task.data.root.dir=${env:GOBBLIN_WORK_DIR}/task-data

    修改其中的$kafka_brokers,$kafka_topics,$name_node即可;

    这里的配置为standalone每小时执行一次,每次执行时根据数据中的time字段来格式化为时间分区进行存放到hdfs上的指定目录;

    2 启动

    export GOBBLIN_JOB_CONFIG_DIR=/opt/gobblin/gobblin-dist/job_conf
    export GOBBLIN_WORK_DIR=/opt/gobblin/gobblin-dist/work_dir
    
    bin/gobblin-standalone.sh start

    3 定制化

    1)希望按照当前时间(而不是数据中的时间)进行时间分区

    package gobblin.writer.partitioner;
    
    import gobblin.configuration.State;
    
    public class DefaultTimeBasedWriterPartitioner extends TimeBasedWriterPartitioner {
        public DefaultTimeBasedWriterPartitioner(State state, int numBranches, int branchId) {
            super(state, numBranches, branchId);
        }
        public long getRecordTimestamp(Object record) {
            return System.currentTimeMillis();
        }
    }

    配置:

    writer.partitioner.class=gobblin.writer.partitioner.DefaultTimeBasedWriterPartitioner

    2)只保存json数据,并且添加换行

    package gobblin.source.extractor.extract.kafka;
    
    import gobblin.configuration.WorkUnitState;
    import gobblin.source.extractor.Extractor;
    
    import java.io.IOException;
    
    public class JsonKafkaSimpleSource extends KafkaSimpleSource {
        public JsonKafkaSimpleSource() {}
        @Override
        public Extractor<String, byte[]> getExtractor(WorkUnitState state) throws IOException {
            return new JsonKafkaSimpleExtractor(state);
        }
    }
    package gobblin.source.extractor.extract.kafka;
    
    import gobblin.configuration.WorkUnitState;
    import gobblin.kafka.client.ByteArrayBasedKafkaRecord;
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class JsonKafkaSimpleExtractor extends KafkaSimpleExtractor {
        public JsonKafkaSimpleExtractor(WorkUnitState state) {
            super(state);
        }
    
        @Override
        protected byte[] decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException {
            byte[] resultBytes = kafkaConsumerRecord.getMessageBytes();
            String result = new String(resultBytes, "UTF-8");
            if (result != null && result.length() > 2 && result.charAt(0) == '{' && result.charAt(result.length() - 1) == '}')
                return (result + "
    ").getBytes("UTF-8");
            else {
                System.out.println("[" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "]found invalid json : " + result);
                return "".getBytes();
            }
        }
    }

     

    配置:

    source.class=gobblin.source.extractor.extract.kafka.JsonKafkaSimpleSource

    4 docker image

    https://hub.docker.com/r/gobblin/gobblin-standalone

    docker run -d gobblin/gobblin-standalone:ubuntu-gobblin-0.10.0

    参考:

    https://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/

    https://gobblin.readthedocs.io/en/latest/user-guide/Configuration-Properties-Glossary/

  • 相关阅读:
    JVM参数配置
    域渗透命令
    相对路径绝对路径
    ESPCMS的CSRF添加管理员账号
    nmap脚本nse的使用
    Nmap简单的漏扫
    MS08-067
    lcx用法
    给自己的服务器传文件 转自别人
    突破大文件上传 和内网ip的端口转发
  • 原文地址:https://www.cnblogs.com/barneywill/p/10959519.html
Copyright © 2011-2022 走看看