zoukankan      html  css  js  c++  java
  • Flume:source和sink

    Flume – 初识flume、source和sink

    目录
    基本概念
    常用源 Source
    常用sink

    基本概念

     什么叫flume?
      分布式,可靠的大量日志收集、聚合和移动工具。

     events
      事件,是一行数据的字节数据,是flume发送文件的基本单位。

     flume配置文件
      重命名flume-env.sh.template为flume-env.sh,并添加[export JAVA_HOME=/soft/jdk]

     flume的Agent
      source //从哪儿读数据。 负责监控并收集数据。相对于channel是生产者。
      channel //数据通道。 通道,相当于数据缓冲区。
      sink //将数据传送往哪儿。 沉槽,负责将数据放置在指定位置。相对于channel是消费者。

     flume如何使用
      在flume的conf文件下,创建conf后缀的文件,使用flume命令启动

     flume命令
      启动:flume-ng agent -f /soft/flume/conf/example.conf -n a1

    常用源 Source

     执行源:Exec Sour //通过linux命令作为source。缺点:失败后数据会丢失,不能保证数据的完整性。
      #定义源:exec
      a1.source.r1.type = exec
      a1.source.r1.command = tail -F /home/centos/1.txt
     滚动目录源:Spooling Directory Source //监控目录,如果目录下有新文件产生,机会将其消费
      #定义源:spoodir
      a1.source.r1.type = spooldir
      #指定监控目录
      a1.source.r1.spoolDir = /home/centos/log
     指定类型的文件:tailDir source #监控目录中指定类型的文件,并监控其消费偏移量;
     通过~/.flume/taildir_position.json监控并实时记录文件偏移量,可通过a1.sources.r1.positionFile配置进行修改
      #定义源:TAILDIR
      a1.source.r1.type = TAILDIR
      #指定监控文件组
      a1.source.r1.filegroups = g1
      #指定g1组中包含的文件
      a1.source.r1.filegroups.g1 = /home/centos/log/.*log
     顺序数字源:Sequence Generator Source //产生顺序数字的源,用作测试
      #定义源:seq
      a1.source.r1.type = seq
      #定义一次RPC产生的批次数量
      a1.source.r1.batchSize = 1024
     压力源:Stress Source //测试集群压力,用作负载测试
      #定义源:stress
      a1.source.r1.type = org.apache.flume.source.StressSource
      #一个event产生的数据量
      a1.source.r1.size = 1073741824

    常用sink

     日志&控制台:logger sink
      a1.sinks.k1.type = logger
     存储在本地文件:File Roll Sink
      #设置滚动文件sink
      a1.sinks.k1.type = file_roll
      #指定文件位置。若文件不存在会报错
      a1.sinks.k1.directory = /home/centos/log2
      #设置滚动周期间隔,0即不滚动;默认30s。
      a1.sinks.k1.sink.rollInterval = 0
     写入到hdfsL:HDFS Sink //默认SequenceFile,可以通过hdfs.fileType指定(SequenceFile, DataStream or CompressedStream)
      #指定类型
      a1.sinks.k1.type = hdfs
      #指定路径,不用单独创建文件夹
      a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H
      #时间相关的配置,必须指定时间戳
      a1.sinks.k1.hdfs.useLocalTimeStamp = true
      #实例化文件的前缀
      a1.sinks.k1.hdfs.filePrefix = events-
      #滚动间隔,0为不滚动
      a1.sinks.k1.hdfs.rollInterval = 0
      #滚动大小;默认1024
      a1.sinks.k1.hdfs.rollSize = 1024
      #指定数据类型;默认为 sequenceFile
      a1.sinks.k1.hdfs.fileType = CompressedStream
      #指定压缩编解码器
      a1.sinks.k1.hdfs.codeC = gzip
     写入到Hbase:hbase sink //需要创建表,无法指定rowkey和col
      #设置类型
      a1.sinks.k1.type = hbase
      a1.sinks.k1.table = ns1:flume
      a1.sinks.k1.columnFaminly = f1
     写入到Hbase:regexhbase sink //需要创建表,可以手动指定rowKey和col
      #设置正则hbase类型
      a1.sinks.k1.type = hbase
      a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
      #手动指定rowkey和列,[ROW_KEY]必须些,且大写
      a1.sinks.k1.serializer.colNames = ROW_KEY,name,age
      #指定正则,与col对应
      a1.sinks.k1.serializer.regex = (.*),(.*),(.*)
      #指定rowkey索引
      a1.sinks.k1.serializer.rowKeyIndex = 0
      a1.sinks.k1.table = ns1:flume
      a1.sinks.k1.coluFamily = f1
     写入到Hive:hive sink //需要启动hive的事务性
      # 设置hive sink
      a1.sinks.k1.type = hive
      # 需要启动hive的metastore:hive --service metastore //metastore源数据仓库
      a1.sinks.k1.hive.metastore = thrift://s101:9083
      a1.sinks.k1.hive.database = default
      # 需要创建事务表
      a1.sinks.k1.hive.table = tx2
      # 指定列和字段的映射
      a1.sinks.k1.serializer = DELIMITED
      # 指定输入的格式,必须是双引号
      a1.sinks.k1.serializer.delimiter = " "
      # 指定hive存储文件展现方式,必须是单引号
      a1.sinks.k1.serializer.serdeSeparator = ' '
      a1.sinks.k1.serializer.fieldnames =id,name,age
     写入到hive补充
      1、首先将/soft/hive/hcatalog/share/hcatalog中的所有jar拷贝到hive的lib库中
        cp /soft/hive/hcatalog/share/hcatalog/* /soft/hive/lib/
      2、启动hive的metastore
        hive --service metastore
      3、启动hive并创建事务表
        SET hive.support.concurrency = true;
        SET hive.enforce.bucketing = true;
        SET hive.exec.dynamic.partition.mode = nonstrict;
        SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
        SET hive.compactor.initiator.on = true;
        SET hive.compactor.worker.threads = 1;
        create table tx2(id int ,name string, age int ) clustered by (id) into 2 buckets stored as orc TBLPROPERTIES('transactional'='true');
      4、启动flume,并使用以上的配置文件
        flume-ng agent -f k_hive.conf -n a1
      5、输入数据验证
        1 tom 18

  • 相关阅读:
    50.Ext_数字输入框_Ext.form.NumberField
    49.Ext.form.TextField()基本用法
    48.EXt.Data.JsonReader()
    47. Ext.form.Field.prototype.msgTarget
    46. Ext中namespace的作用(转)
    45. ExtJS ComboBox 下拉列表详细用法
    44. Ext信息提示对话框
    43. ExtJs控件属性配置详细
    42.extjs Combobox动态加载数据问题,mode:local 还是remote
    堆叠顺序
  • 原文地址:https://www.cnblogs.com/yiwanfan/p/9470320.html
Copyright © 2011-2022 走看看