zoukankan      html  css  js  c++  java
  • Kafka+flume+kudu——kafka的数据通过flume加载到kudu中

    1. 预置条件

      本文采用clouderaManage安装了kafka、flume、和kudu。注意:在安装kudu的时候一定需要时间同步。具体的时间同步设置方法请参照:https://blog.csdn.net/u014516601/article/details/81433594

    本文kafka、flume和kudu的版本分别如下:

    <flume.version>1.6.0</flume.version>

    <kudu.version>1.7.0</kudu.version>

    1. 数据加载的流程
      1. flume没有集成kudu,因此需要第三方jar包,因此依赖kudu-flume-sink-1.7.0-cdh5.16.1.jar,将该jar包放在flume的lib下面。如果基于clouderaManage安装,则可以直接放在/opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib/flume-ng/lib
      2. 编写kudusink类,实现KuduOperationsProductor接口,必须重写:configure、initialize、getOperations和close方法,下面是本文的实例代码:

    三 .编辑flume的agent文件

    kafka.sources = kafkasource

    kafka.sinks = kudusink1 kudusink2

    kafka.channels = flumechannel1 flumechannel2

     

    kafka.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource

    kafka.sources.kafkasource.zookeeperConnect = zookeeper地址:2182

    kafka.sources.kafkasource.topic = us_general

    kafka.sources.kafkasource.kafka.consumer.timeout.ms = 100

    kafka.sources.kafkasource.kafka.consumer.group.id = flume-kudu

    kafka.sources.kafkasource.selector.type = replicating //本次采用多路复用

    kafka.sources.kafkasource.channels = flumechannel1 flumechannel2

     

    kafka.channels.flumechannel1.type = memory

    kafka.channels.flumechannel1.capacity = 10000

    kafka.channels.flumechannel1.transactionCapacity = 100

     

    kafka.channels.flumechannel2.type = memory

    kafka.channels.flumechannel2.capacity = 10000

    kafka.channels.flumechannel2.transactionCapacity = 100

     

     

    kafka.sinks.kudusink1.type = org.apache.kudu.flume.sink.KuduSink

    kafka.sinks.kudusink1.masterAddresses = kuduMaster的地址:7051

    kafka.sinks.kudusink1.tableName = impala::kududb.hisrealinfo1

    kafka.sinks.kudusink1.operation = insert

    kafka.sinks.kudusink1.batchSize = 50

    kafka.sinks.kudusink1.producer = KuduSinkjar包

    kafka.sinks.kudusink1.channel = flumechannel1

     

    kafka.sinks.kudusink2.type = org.apache.kudu.flume.sink.KuduSink

    kafka.sinks.kudusink2.masterAddresses = kuduMaster的地址:7051

    kafka.sinks.kudusink2.tableName = impala::kududb.realinfo1

    kafka.sinks.kudusink2.operation = insert

    kafka.sinks.kudusink2.batchSize = 50

    kafka.sinks.kudusink2.producer = KuduSinkjar包

    kafka.sinks.kudusink2.channel = flumechannel2

    四.执行flume_ng命令模式

    flume-ng agent --conf ./flumekudu/ --conf-file $FLUME_USGENERAL_CONFIG --name kafka -Dflume.root.logger=INFO,console

    注意:

    基于命令模式的执行flume_ng,可能出现内存溢出的错误。这是,需要调节jdk的堆内存大小。

  • 相关阅读:
    【BZOJ】【1833】【ZJOI2010】count 数字计数
    bzoj2588: Spoj 10628. Count on a tree(树上第k大)(主席树)
    NOIP2017金秋冲刺训练营杯联赛模拟大奖赛第一轮Day2题解
    NOIP2017金秋冲刺训练营杯联赛模拟大奖赛第二轮Day2题解
    51nod 1962 区间计数(单调栈+二分)
    51nod 1486 大大走格子(DP+组合数学)
    bzoj2276: [Poi2011]Temperature(单调队列/堆)
    5028: 小Z的加油店(线段树)
    bzoj2216: [Poi2011]Lightning Conductor(分治决策单调性优化)
    bzoj1057: [ZJOI2007]棋盘制作(悬线法)
  • 原文地址:https://www.cnblogs.com/tomorrow-hope/p/11492562.html
Copyright © 2011-2022 走看看