zoukankan      html  css  js  c++  java
  • 使用flume将数据sink到HBase

    ===========>先创建Hbase表和列族<================
    案例1:源数据一行对应Hbase的一列存储(hbase-1.12没有问题)
    ================================================================================
    #说明:案例是flume监听目录/home/hadoop/flume_hbase采集到hbase;必须先在Hbase中创建表和列族

    数据目录:
    vi /home/hadoop/flume_hbase/word.txt
    1001 pan nan
    2200 lili nv

    create 'tb_words','cf_wd'

    vi flume-hbase.conf
    #Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    #Describe/configure the source
    a1.sources.r1.type = spooldir //当监控文件夹时,不用执行文件,只需在文件夹下有操作,就可监听到信息
    a1.sources.r1.spoolDir=/home/hadoop/flume_hbase

    # Describe the sink
    a1.sinks.k1.type =asynchbase
    a1.sinks.k1.table = tb_words
    a1.sinks.k1.columnFamily = cf_wd
    #目前自己处理到支持一个列名的,多个列名称失败了,多个列名考虑使用下面的案例的正则表达式方式匹配
    a1.sinks.k1.serializer.payloadColumn=wd
    a1.sinks.k1.serializer.incrementColumn=last
    a1.sinks.k1.serializer.rowPrefix=QM
    a1.sinks.k1.serializer.suffix=timestamp
    a1.sinks.k1.serializer =org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


    案例2:使用正则表达式,对行分多个列值
    说明:apache-flume-1.7.0-bin.tar.gz 和 Hbase-1.12+
    ================================================================================
    create 'tb_words2','words'

    数据目录:
    vi /home/hadoop/flume_hbase/data.txt
    1001,panzong,nan
    2200,lili,nv

    flume配置文件:
    vi flume_2_hbase.conf
    #Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    #Describe/configure the source
    a1.sources.r1.type = cn.qm.flume.source.MySource //可更换为spooldir
    a1.sources.r1.spoolDir=/home/hadoop/flume_hbase

    # Describe the sink
    #a1.sinks.k1.type =org.apache.flume.sink.hbase.HBaseSink
    a1.sinks.k1.type =hbase
    a1.sinks.k1.table = tb_words2
    a1.sinks.k1.columnFamily = words
    a1.sinks.k1.serializer.enableWal= true
    a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
    #查看RegexHbaseEventSerializer类源码,可以快速理解rowKeyIndex/colNames属性
    a1.sinks.k1.serializer.regex= ^([0-9]+),([a-z]+),([a-z]+)$
    # 指定某一列来当主键,而不是用随机生成的key,#第一列为Hbase的rowkey
    #RegexHbaseEventSerializer 源码查看
    a1.sinks.k1.serializer.rowKeyIndex =0
    #ROW_KEY为系统指定列名
    a1.sinks.k1.serializer.colNames= ROW_KEY,name,sex
    a1.sinks.k1.zookeeperQuorum =hdp-qm-05:2181,hdp-qm-06:2181,hdp-qm-07:2181

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


    #第二列为Hbase的rowkey
    #a1.sinks.k1.serializer.rowKeyIndex = 1
    #a1.sinks.k1.serializer.regex= ^([0-9]+),([a-z]+),([a-z]+)$
    #a1.sinks.k1.serializer.colNames= id,ROW_KEY,sex

    成就人
  • 相关阅读:
    打印沙漏
    秋季学期学习总结
    反射
    线程(二)join、yeild、同步(synchronized:同步块,同步方法;,Lock)、非线程安全单例模式、线程安全单例模式、多线程售卖电影票处理、通过线程依次打印A、B、C、wait和sleep
    线程(一)
    红包计算的方法(通过2倍指数法进行计算,通过线性切割法计算)
    Math常用类、Date类、Calendar类、两者相互转换
    Java包(访问修饰符的范围)、String字符串、StringBuilder类、基本类型和引用类型
    编写一个系统(登录、注册、验证用户名密码和验证码、覆盖存储用户)
    递归的使用:调用方法自身
  • 原文地址:https://www.cnblogs.com/pingzizhuanshu/p/9102494.html
Copyright © 2011-2022 走看看