zoukankan      html  css  js  c++  java
  • Flume之Sink

    一、Logger Sink

    记录指定级别(比如INFO,DEBUG,ERROR等)的日志,通常用于调试,要求在 --conf(-c )参数指定的目录下有log4j的配置文件。根据设计,logger sink将体内容限制为16字节,从而避免屏幕充斥着过多的内容。如果想要查看调试的完整内容,那么你应该使用其他的sink,也许可以使用file_roll sink,它会将日志写到本地文件系统中。

    可配置项说明:

    配置示例:

    #配置Agent a1 的组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=s1
     
    #描述/配置a1的r1
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=44444
     
    #描述a1的s1
    a1.sinks.s1.type=logger
    #描述a1的c1
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
     
    #位channel 绑定 source和sink
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1

    二、File Roll Sink

    在本地系统中存储事件。每隔指定时长生成文件保存这段时间内收集到的日志信息。

    可配置参数说明:

    配置示例:

    #配置Agent a1 的组件
    a1.sources=r1
    a1.sinks=s1
    a1.channels=c1
     
    #描述/配置a1的source1
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=8888
     
    #描述sink
    a1.sinks.s1.type=file_roll
    a1.sinks.s1.sink.directory=/home/work/rolldata
    a1.sinks.s1.sink.rollInterval=60
     
    #描述内存channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
     
    #位channel 绑定 source和sink
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1

    创建指定的文件目录 /home/work/rolldata,启动测试:

    ../bin/flume-ng agent -c ./ -f ./template.conf -n a1

    三、Avro Sink

    是实现多级流动、扇出流(1到多) 扇入流(多到1) 的基础。

    可配置项说明:

    3.1  多级流动需求:

    让01机的flume通过netcat source源接收数据,然后通过avro sink 发给02机==》02机的flume利用avro source源收数据,然后通过avro sink 传给03机==》03机通过avro source源收数据,通过logger sink 输出到控制台上(本例中,02机的ip:192.168.234.212  || 03机的ip:192.168.234.213)

    实现步骤:

    1. 准备三台虚拟机,并安装好flume(关闭每台机器的防火墙)
    2. 配置每台flume的配置文件
    3. 启动测试

    01机的配置示例:

    #配置Agent a1 的组件
    a1.sources=r1
    a1.sinks=s1
    a1.channels=c1
     
    #描述/配置a1的source
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=8888
     
    #描述sink
    a1.sinks.s1.type=avro
    a1.sinks.s1.hostname=192.168.234.212
    a1.sinks.s1.port=9999
     
    #描述内存channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
     
    #位channel 绑定 source和sink
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1

    02机的配置示例:

    #配置Agent a1 的组件
    a1.sources=r1
    a1.sinks=s1
    a1.channels=c1
     
    #描述/配置a1的source
    a1.sources.r1.type=avro
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=9999
     
    #描述sink
    a1.sinks.s1.type=avro
    a1.sinks.s1.hostname=192.168.234.213
    a1.sinks.s1.port=9999
     
    #描述内存channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
     
    #位channel 绑定 source和sink
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1

    03机的配置示例:

    #配置Agent a1 的组件
    a1.sources=r1
    a1.sinks=s1
    a1.channels=c1
     
    #描述/配置a1的source1
    a1.sources.r1.type=avro
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=9999
     
    #描述sink
    a1.sinks.s1.type=logger
     
    #描述内存channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
     
    #位channel 绑定 source和sink
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1

    3.2扇出流案例需求

    01机发出的数据,让02,03来接收

    实现步骤:

    1. 准备三台虚拟机,并安装好flume(关闭每台机器的防火墙)
    2. 配置每台flume的配置文件
    3. 启动测试

    01机的配置文件:

    #配置Agent a1 的组件
    a1.sources=r1
    a1.sinks=s1 s2
    a1.channels=c1 c2
     
    #描述/配置a1的source1
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=8888
     
    #描述sink
    a1.sinks.s1.type=avro
    a1.sinks.s1.hostname=192.168.234.212
    a1.sinks.s1.port=9999
     
    a1.sinks.s2.type=avro
    a1.sinks.s2.hostname=192.168.234.213
    a1.sinks.s2.port=9999
    #描述内存channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
     
    a1.channels.c2.type=memory
    a1.channels.c2.capacity=1000
    a1.channels.c2.transactionCapacity=100
     
    #位channel 绑定 source和sink
    a1.sources.r1.channels=c1 c2
    a1.sinks.s1.channel=c1
    a1.sinks.s2.channel=c2

    02,03配置示例:

    #配置Agent a1 的组件
    a1.sources=r1
    a1.sinks=s1
    a1.channels=c1
     
    #描述/配置a1的source1
    a1.sources.r1.type=avro
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=9999
     
    #描述sink
    a1.sinks.s1.type=logger
     
    #描述内存channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
     
    #位channel 绑定 source和sink
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1

    3.3 扇入案列需求

    02,03机收到的数据都发往01

    02,03的配置示例:

    #配置Agent a1 的组件
    a1.sources=r1
    a1.sinks=s1
    a1.channels=c1
     
    #描述/配置a1的source1
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=8888
     
    #描述sink
    a1.sinks.s1.type=avro
    a1.sinks.s1.hostname=192.168.234.163
    a1.sinks.s1.port=9999
     
    #描述内存channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
     
    #位channel 绑定 source和sink
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1

    01机的配置示例:

    #配置Agent a1 的组件
    a1.sources=r1
    a1.sinks=s1
    a1.channels=c1
     
    #描述/配置a1的source1
    a1.sources.r1.type=avro
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=9999
     
    #描述sink
    a1.sinks.s1.type=logger
     
    #描述内存channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
     
    #位channel 绑定 source和sink
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1

    四、HDFS Sink

    此Sink将事件写入到Hadoop分布式文件系统HDFS中。目前它支持创建文本文件和序列化文件。对这两种格式都支持压缩。这些文件可以分卷,按照指定的时间或数据量或事件的数量为基础。它还通过类似时间戳或机器属性对数据进行 buckets/partitions 操作。

    HDFS的目录路径可以包含将要由HDFS替换格式的转移序列用以生成存储事件的目录/文件名。使用这个Sink要求haddop必须已经安装好,以便Flume可以通过hadoop提供的jar包与HDFS进行通信。

    可配置项说明

    配置项

    说明

    channel

     

    type

    hdfs

    hdfs.path

    HDFS 目录路径 (hdfs://namenode/flume/webdata/)

    hdfs.inUseSuffix

    .tmp        Flume正在处理的文件所加的后缀

    hdfs.rollInterval

    30        Number of seconds to wait before 
    举例:如果flume需要40s,30s=>1个文件  10s=>30s 1个文件

    hdfs.rollSize

    1024        File size to trigger roll, in bytes (0: never roll based on file size)

    hdfs.rollCount

    10        Number of events written to file before it rolled (0 = never roll based on number of events)

    hdfs.fileType

    SequenceFile        File format: currently SequenceFile, DataStream or CompressedStream

    hdfs.retryInterval

    80        Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.

     

    #配置Agent a1 的组件
    a1.sources=r1
    a1.sinks=s1
    a1.channels=c1
     
    #描述/配置a1的source1
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=8888
     
    #描述sink
    a1.sinks.s1.type=hdfs
    a1.sinks.s1.hdfs.path=hdfs://192.168.234.21:9000/flume
    #处理数据的类型,DataStream为普通的文本类型
    a1.sinks.s1.hdfs.fileType=DataStream
     
    #描述内存channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity=100
     
    #位channel 绑定 source和sink
    a1.sources.r1.channels=c1
    a1.sinks.s1.channel=c1

    报错是因为flume缺少相关hadoop的依赖jar包,找到以下的jar包,放到flume的lib目录下即可。

    commons-configuration-1.6.jar

    hadoop-auth-2.5.2.jar

    hadoop-common-2.5.2.jar

    hadoop-hdfs-2.5.2.jar

    hadoop-mapreduce-client-core-2.5.2.jar

    但是一个一个找特别麻烦,所以解决办法是将hadoop的jar包都拷贝到flume的lib目录下。进入到hadoop安装目录的share目录下的hadoop目录,执行:

    scp common/*   common/lib/*   hdfs/*   hdfs/lib/*   mapreduce/*   mapreduce/lib/*   tools/lib/* 192.168.234.163:/home/software/flume/lib/
  • 相关阅读:
    /etc/sysctl.conf 控制内核相关配置文件
    python 并发编程 非阻塞IO模型
    python 并发编程 多路复用IO模型
    python 并发编程 异步IO模型
    python 并发编程 阻塞IO模型
    python 并发编程 基于gevent模块 协程池 实现并发的套接字通信
    python 并发编程 基于gevent模块实现并发的套接字通信
    python 并发编程 io模型 目录
    python 并发编程 socket 服务端 客户端 阻塞io行为
    python 并发编程 IO模型介绍
  • 原文地址:https://www.cnblogs.com/johnvwan/p/15656183.html
Copyright © 2011-2022 走看看