zoukankan      html  css  js  c++  java
  • Flume 入门--几种不同的Sinks

    主要介绍几种常见Flume的Sink--汇聚点

    1.Logger Sink 

    记录INFO级别的日志,一般用于调试。前面介绍Source时候用到的Sink都是这个类型的Sink

    必须配置的属性:

    属性说明:
                !channel    –    
                !type    –    The component type name, needs to be logger
                maxBytesToLog    16    Maximum number of bytes of the Event body to log

                要求必须在 --conf 参数指定的目录下有 log4j的配置文件
                可以通过-Dflume.root.logger=INFO,console在命令启动时手动指定log4j参数

    案例:前面的例子都是这种类型的Sink

    2.File Roll Sink

    在本地文件系统中存储事件。每隔指定时长生成文件保存这段时间内收集到的日志信息。

    属性说明:
                !channel    –    
                !type    –    类型,必须是"file_roll"
                !sink.directory    –    文件被存储的目录
                sink.rollInterval    30    滚动文件每隔30秒(应该是每隔30秒钟单独切割数据到一个文件的意思)。如果设置为0,则禁止滚动,从而导致所有数据被写入到一个文件。
                sink.serializer    TEXT    Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
                batchSize    100    

    案例:

                  编写配置文件:
    				#命名Agent a1的组件
    				a1.sources  =  r1
    				a1.sinks  =  k1
    				a1.channels  =  c1
    
    				#描述/配置Source
    				a1.sources.r1.type  = http
    				a1.sources.r1.port  = 6666
    
    				#描述Sink
    				a1.sinks.k1.type  = file_roll
    				a1.sinks.k1.sink.directory = /home/park/work/apache-flume-1.6.0-bin/mysink
    				#描述内存Channel
    				a1.channels.c1.type  =  memory
    				a1.channels.c1.capacity  =  1000
    				a1.channels.c1.transactionCapacity  =  100
    
    				#为Channle绑定Source和Sink
    				a1.sources.r1.channels  =  c1
    				a1.sinks.k1.channel  =  c1
    

     启动flume:

    ./flume-ng agent --conf ../conf --conf-file ../conf/template7.conf --name a1 -Dflume.root.logger=INFO,console
    

    测试:

    通过curl命令向目标主机发送请求,就会发现在指定的文件夹下出现记录收集日志的文件

    3.Avro Sink

    是实现多级流动 和 扇出流(1到多) 扇入流(多到1) 的基础。非常重要 但是需要多台机器

    必要属性说明:
                !channel    –    
                !type    –    The component type name, needs to be avro.
                !hostname    –    The hostname or IP address to bind to.
                !port    –    The port # to listen on.

    案例1.多级流动  h1流动到h2

    h2:
    				配置配置文件:
    					#命名Agent组件
    					a1.sources=r1
    					a1.sinks=k1
    					a1.channels=c1
    
    					#描述/配置Source
    					a1.sources.r1.type=avro
    					a1.sources.r1.bind=0.0.0.0
    					a1.sources.r1.port=9988
    					#描述Sink
    					a1.sinks.k1.type=logger
    					#描述内存Channel
    					a1.channels.c1.type=memory
    					a1.channels.c1.capacity=1000
    					a1.channels.c1.transactionCapacity=1000
    					#为Channel绑定Source和Sink
    					a1.sources.r1.channels=c1
    					a1.sinks.k1.channel=c1
    				启动flume:
    					./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
    
    				
    			h1:
    				配置配置文件
    					#命名Agent组件
    					a1.sources=r1
    					a1.sinks=k1
    					a1.channels=c1
    
    					#描述/配置Source
    					a1.sources.r1.type=http
    					a1.sources.r1.port=8888
    					#描述Sink
    					a1.sinks.k1.type=avro
    					a1.sinks.k1.hostname=192.168.242.138
    					a1.sinks.k1.port=9988
    					#描述内存Channel
    					a1.channels.c1.type=memory
    					a1.channels.c1.capacity=1000
    					a1.channels.c1.transactionCapacity=1000
    					#为Channel绑定Source和Sink
    					a1.sources.r1.chafile:///C:/Users/park/Desktop/Day01_Flume/%E6%96%87%E6%A1%A3/Flume%201.6.0%20User%20Guide%20%E2%80%94%20Apache%20Flume.htm#irc-sinknnels=c1
    					a1.sinks.k1.channel=c1
    

     启动flume

    发送http请求到h1:

    curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://192.168.242.133:8888
    

     稍等几秒后,发现h2最终收到了这条消息

    案例2:扇出流(h1扇出到h2,h3)

    h2 h3:
    				配置配置文件:
    					#命名Agent组件
    					a1.sources=r1
    					a1.sinks=k1
    					a1.channels=c1
    
    					#描述/配置Source
    					a1.sources.r1.type=avro
    					a1.sources.r1.bind=0.0.0.0
    					a1.sources.r1.port=9988
    					#描述Sink
    					a1.sinks.k1.type=logger
    					#描述内存Channel
    					a1.channels.c1.type=memory
    					a1.channels.c1.capacity=1000
    					a1.channels.c1.transactionCapacity=1000
    					#为Channel绑定Source和Sink
    					a1.sources.r1.channels=c1
    					a1.sinks.k1.channel=c1
    				启动flume:
    					./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console
    
    			h1:
    				配置配置文件
    					#命名Agent组件
    					a1.sources=r1
    					a1.sinks=k1 k2
    					a1.channels=c1 c2
    
    					#描述/配置Source
    					a1.sources.r1.type=http
    					a1.sources.r1.port=8888
    					#描述Sink
    					a1.sinks.k1.type=avro
    					a1.sinks.k1.hostname=192.168.242.138
    					a1.sinks.k1.port=9988
    					a1.sinks.k2.type=avro
    					a1.sinks.k2.hostname=192.168.242.135
    					a1.sinks.k2.port=9988
    					#描述内存Channel
    					a1.channels.c1.type=memory
    					a1.channels.c1.capacity=1000
    					a1.channels.c1.transactionCapacity=1000
    					a1.channels.c2.type=memory
    					a1.channels.c2.capacity=1000
    					a1.channels.c2.transactionCapacity=1000
    					#为Channel绑定Source和Sink
    					a1.sources.r1.channels=c1 c2
    					a1.sinks.k1.channel=c1	
    					a1.sinks.k2.channel=c2	
    

    案例3:扇入流()

    m3:
    				编写配置文件:
    					#命名Agent组件
    					a1.sources=r1
    					a1.sinks=k1
    					a1.channels=c1
    					#描述/配置Source
    					a1.sources.r1.type=avro
    					a1.sources.r1.bind=0.0.0.0
    					a1.sources.r1.port=4141
    					#描述Sink
    					a1.sinks.k1.type=logger
    					#描述内存Channel
    					a1.channels.c1.type=memory
    					a1.channels.c1.capacity=1000
    					a1.channels.c1.transactionCapacity=1000
    					#为Channel绑定Source和Sink
    					a1.sources.r1.channels=c1
    					a1.sinks.k1.channel=c1
    				启动flume:
    					./flume-ng agent --conf ../conf --conf-file ../conf/template.conf --name a1 -Dflume.root.logger=INFO,console
    			
    			m1、m2:
    				编写配置文件:
    					#命名Agent组件
    					a1.sources=r1
    					a1.sinks=k1
    					a1.channels=c1
    
    					#描述/配置Source
    					a1.sources.r1.type=http
    					a1.sources.r1.port=8888
    					#描述Sink
    					a1.sinks.k1.type=avro
    					a1.sinks.k1.hostname=192.168.242.135
    					a1.sinks.k1.port=4141
    					#描述内存Channel
    					a1.channels.c1.type=memory
    					a1.channels.c1.capacity=1000
    					a1.channels.c1.transactionCapacity=1000
    					#为Channel绑定Source和Sink
    					a1.sources.r1.channels=c1
    					a1.sinks.k1.channel=c1
    				启动flume:
    					./flume-ng agent --conf ../conf --conf-file ../conf/template9.conf --name a1 -Dflume.root.logger=INFO,console
    				m1通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:
    					[root@localhost conf]# curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
    			 	m2通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:
    					[root@localhost conf]# curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
    				发现m3均能正确收到消息
    

    4、HDFS Sink

    此Sink将事件写入到Hadoop分布式文件系统HDFS中。
                目前它支持创建文本文件和序列化文件。对这两种格式都支持压缩。 这些文件可以分卷,按照指定的时间或数据量或事件的数量为基础。
                它还通过类似时间戳或机器属性对数据进行 buckets/partitions 操作  
                HDFS的目录路径可以包含将要由HDFS替换格式的转移序列用以生成存储事件的目录/文件名。
                使用这个Sink要求hadoop必须已经安装好,以便Flume可以通过hadoop提供的jar包与HDFS进行通信。
                注意,此版本hadoop必须支持sync()调用。

    必要属性说明:
                    !channel    –    
                    !type    –    类型名称,必须是“HDFS”
                    !hdfs.path    –    HDFS 目录路径 (eg hdfs://namenode/flume/webdata/)
                    hdfs.filePrefix    FlumeData    Flume在目录下创建文件的名称前缀
                    hdfs.fileSuffix    –    追加到文件的名称后缀 (eg .avro - 注: 日期时间不会自动添加)
                    hdfs.inUsePrefix    –    Flume正在处理的文件所加的前缀
                    hdfs.inUseSuffix    .tmp    Flume正在处理的文件所加的后缀

    案例:

                    #命名Agent组件
    					a1.sources=r1
    					a1.sinks=k1
    					a1.channels=c1
    
    					#描述/配置Source
    					a1.sources.r1.type=http
    					a1.sources.r1.port=8888
    					#描述Sink
    					a1.sinks.k1.type=hdfs
    					a1.sinks.k1.hdfs.path=hdfs://0.0.0.0:9000/ppp
    					#描述内存Channel
    					a1.channels.c1.type=memory
    					a1.channels.c1.capacity=1000
    					a1.channels.c1.transactionCapacity=1000
    					#为Channel绑定Source和Sink
    					a1.sources.r1.channels=c1
    					a1.sinks.k1.channel=c1
    				           
    

     启动flume:

    ./flume-ng agent --conf ../conf --conf-file ../conf/template9.conf --name a1 -Dflume.root.logger=INFO,console 
    

     测试:通过利用curl给目的主机发送命令,会发现在HDFS中会生成相应的记录文件。

  • 相关阅读:
    python学习之老男孩python全栈第九期_day015知识点总结
    python学习之老男孩python全栈第九期_第二周学习总结
    python学习之老男孩python全栈第九期_day016作业
    python学习之老男孩python全栈第九期_day014知识点总结
    监听文件输入的例子
    python学习之老男孩python全栈第九期_day013知识点总结
    python学习之老男孩python全栈第九期_day015作业_老男孩Python全9期练习题(面试真题模拟)
    python学习之老男孩python全栈第九期_day014作业
    python学习之老男孩python全栈第九期_day011作业
    python学习之老男孩python全栈第九期_day008作业
  • 原文地址:https://www.cnblogs.com/itdyb/p/6270893.html
Copyright © 2011-2022 走看看