zoukankan      html  css  js  c++  java
  • flume的一些使用

    一、第一层采集通道的编写

    1.第一层采集脚本Source的选择
    ①Source:
    数据源在日志文件中!
    读取日志中的数据,可以使用以下Source
    ExecSource: 可以执行一个linux命令,例如tail -f 日志文件,
    讲读取的到的数据封装为Event!
    不用!不安全,可能丢数据!
    SpoolingDirSource: 可以读取一个目录中的文本文件!
    保证目录中没有重名的文件!
    保证目录中的文件都是封闭状态,一旦放入目录中,不能再继续写入!
    每个日志封闭后,才能放入到SpoolingDir,不然agent就故障!
    TailDirSource: 接近实时第读取指定的文件!断点续传功能!
    使用此Source!

    使用TailDirSource

    ②Channel:
    KafkaChannel:
    优点: 基于kafka的副本功能,提供了高可用性!event被存储在kafka中!
    即便agent挂掉或broker挂掉,依然可以让sink从channel中读取数据!

    应用场景:
    ①KafkaChannel和sink和source一起使用,单纯作为channel。
    ②KafkaChannel+拦截器+Source,只要Source把数据写入到kafka就完成
    目前使用的场景!
    ③KafkaChannel+sink,使用flume将kafka中的数据写入到其他的目的地,例如hdfs!

    为例在上述场景工作,KafkaChannel可以配置生产者和消费者的参数!

    配置参数:
    ①在channel层面的参数,例如channel的类型,channel的容量等,需要和之前一样,
    在channel层面配置,例如:a1.channel.k1.type
    ②和kafka集群相关的参数,需要在channel层面配置后,再加上kafka.
    例如: a1.channels.k1.kafka.topic : 向哪个主题发送数据
    a1.channels.k1.kafka.bootstrap.servers: 集群地址
    ③和Produer和Consumer相关的参数,需要加上produer和consumer的前缀:
    例如:a1.channels.k1.kafka.producer.acks=all
    a1.channels.k1.kafka.consumer.group.id=atguigu

    必须的配置:
    type=org.apache.flume.channel.kafka.KafkaChannel
    kafka.bootstrap.servers=
    可选:
    kafka.topic: 生成到哪个主题
    parseAsFlumeEvent=true(默认):
    如果parseAsFlumeEvent=true,kafkaChannel会把数据以flume中Event的结构作为参考,
    把event中的header+body放入ProducerRecord的value中!

    如果parseAsFlumeEvent=false,kafkaChannel会把数据以flume中Event的结构作为参考,
    把event中body放入ProducerRecord的value中!

    a1.channels.k1.kafka.producer.acks=0

    2. 拦截器
    日志数据有两种类型,一种是事件日志,格式 时间戳|{"ap":xx,"cm":{},"et":[{},{}]}
    另一种是启动日志,格式:{"en":"start"}

    在1个source对接两个KafkaChannel时,需要使用MulitPlexing Channel Selector,
    讲启动日志,分配到启动日志所在的Chanel,讲事件日志分配到事件日志所在的Channel!

    MulitPlexing Channel Selector根据event,header中指定key的映射,来分配!

    需要自定义拦截器,根据不同的数据类型,在每个Event对象的header中添加key!

    功能: ①为每个Event,在header中添加key
    ②过滤不符合要求的数据(格式有损坏)
    启动日志: {},验证JSON字符串的完整性,是否以{}开头结尾
    事件日志: 时间戳|{}
    时间戳需要合法:
    a)长度合法(13位)
    b)都是数字
    验证JSON字符串的完整性,是否以{}开头结尾


    一、第二层采集通道的设计分析

    1.目的
    讲已经存储在kafka集群中的数据,使用flume上传到HDFS!

    2. 架构设计
    课件上推荐的:
    数据源在kafka,因此需要使用一个可以对接kafka的source,即kafkaSource
    为了安全起见,选择filechannel
    目的地在hdfs,使用hdfssink

    自己尝试:
    kafkaChannel+hdfssink

    3. 组件分析
    ①kafkaSource:kafkaSource就是kafka的一个消费者线程,可以从指定的主题中读取数据!
    如果希望提供消费的速率,可以配置多个kafkaSource,这些source组成同一个组!

    kafkaSource在工作时,会检查event的header中有没有timestamp属性,如果没有,
    kafkaSource会自动为event添加timestamp=当前kafkaSource所在机器的时间!

    kafkaSource启动一个消费者,消费者在消费时,默认从分区的最后一个位置消费!

    必须的配置:
    type=org.apache.flume.source.kafka.KafkaSource
    kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
    kafka.topics=消费的主题
    kafka.topics.regex=使用正则表达式匹配主题

    可选的配置:
    kafka.consumer.group.id=消费者所在的组id
    batchSize=一次put多少数据,小于10000
    batchDurationMillis=一次put可以最多使用多少时间

    和kafkaConsumer相关的属性:kafka.consumer=consumer的属性名
    例如:kafka.consumer.auto.offset.reset

    ②fileChannel: channel中的event是存储在文件中!比memorychannel可靠,但是效率略低!
    必须的配置:
    type=file
    checkpointDir=checkpoint线程(负责检查文件中哪些event已经被sink消费了,将这些event的文件删除)保存数据的目录!
    useDualCheckpoints=false 是否启动双检查点,如果启动后,会再启动一个备用的checkpoint线程!
    如果改为true,还需要设置backupCheckpointDir(备用的checkpoint线程的工作目录)
    dataDirs=在哪些目录下保存event,默认为~/.flume/file-channel/data,可以是逗号分割的多个目录!

    ③hdfssink: hdfssink将event写入到HDFS!目前只支持生成两种类型的文件: text | sequenceFile,这两种文件都可以使用压缩!
    写入到HDFS的文件可以自动滚动(关闭当前正在写的文件,创建一个新文件)。基于时间、events的数量、数据大小进行周期性的滚动!
    支持基于时间和采集数据的机器进行分桶和分区操作!
    HDFS数据所上传的目录或文件名可以包含一个格式化的转义序列,这个路径或文件名会在上传event时,被自动替换,替换为完整的路径名!
    使用此Sink要求本机已经安装了hadoop,或持有hadoop的jar包!
    配置:
    必须配置:
    type – The component type name, needs to be hdfs
    hdfs.path – HDFS directory path (eg hdfs://namenode/flume/webdata/)

    参考:
    a1.sinks.k1.type = hdfs
    #一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果没有需要将useLocalTimeStamp = true
    a1.sinks.k1.hdfs.path = hdfs://hadoop101:9000/flume/%Y%m%d/%H/%M
    #上传文件的前缀
    a1.sinks.k1.hdfs.filePrefix = logs-

    #以下三个和目录的滚动相关,目录一旦设置了时间转义序列,基于时间戳滚动
    #是否将时间戳向下舍
    a1.sinks.k1.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a1.sinks.k1.hdfs.roundValue = 1
    #重新定义时间单位
    a1.sinks.k1.hdfs.roundUnit = minute

    #是否使用本地时间戳
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #积攒多少个Event才flush到HDFS一次
    a1.sinks.k1.hdfs.batchSize = 100

    #以下三个和文件的滚动相关,以下三个参数是或的关系!以下三个参数如果值为0都代表禁用!
    #60秒滚动生成一个新的文件
    a1.sinks.k1.hdfs.rollInterval = 10
    #设置每个文件到128M时滚动
    a1.sinks.k1.hdfs.rollSize = 134217700
    #每写多少个event滚动一次
    a1.sinks.k1.hdfs.rollCount = 0
    #以不压缩的文本形式保存数据
    a1.sinks.k1.hdfs.fileType=DataStream


  • 相关阅读:
    UISegmentControl
    UISwitch 开关控件
    UI弹出键盘和收回键盘
    UITextField的属性设置
    UIButton的属性设置
    UILabel 的属性设置
    创建UIView对象
    id和instancetype的异同
    SPOJ FIBPOL
    HDU 6168 Numbers
  • 原文地址:https://www.cnblogs.com/hejunhong/p/13342172.html
Copyright © 2011-2022 走看看