zoukankan      html  css  js  c++  java
  • Flume的介绍和简单操作

    Flume是什么

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

    Flume的功能

    • 支持在日志系统中定制各类数据发送方,用于收集数据
    • 提供对数据简单处理,并写到各类数据接收方(可定制)的能力

    Flume的组成

    • Agent:核心组件
      • source 负责数据的产生或搜集
      • channel 是一种短暂的存储容器,负责数据的存储持久化
      • sink 负责数据的转发

    Flume的工作流示意图

    • 数据流模型
      image
    • 多Agent模型
      image
    • 合并模型
      image
    • 混合模型
      image

    Flume的安装

    下载安装包并解压

    wget http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
    tar -zxvf apache-flume-1.8.0-bin.tar.gz
    

    配置环境变量

    vim ~/.bashrc
    
    export FLUME_HOME=/usr/local/src/apache-flume-1.8.0-bin
    export PATH=$PATH:$FLUME_HOME/bin
    
    source ~/.bashrc
    

    Flume简单操作

    • netcat模式
      进入conf目录下编写netcat.conf文件,内容如下:
    agent.sources = netcatSource
    agent.channels = memoryChannel
    agent.sinks = loggerSink
    
    agent.sources.netcatSource.type = netcat
    agent.sources.netcatSource.bind = localhost
    agent.sources.netcatSource.port = 11111
    agent.sources.netcatSource.channels = memoryChannel
    
    agent.sinks.loggerSink.type = logger
    agent.sinks.loggerSink.channel = memoryChannel
    
    agent.channels.memoryChannel.type = memory
    agent.channels.memoryChannel.capacity = 100
    agent.channels.memoryChannel.transactionCapacity = 10
    

    启动一个实例

    (py27) [root@master conf]# pwd
    /usr/local/src/apache-flume-1.8.0-bin/conf
    (py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./netcat.conf --name agent -Dflume.root.logger=INFO,console
    

    启动成功

    18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
    18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:./flume_netcat.conf
    18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink
    18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink
    18/10/24 11:26:35 INFO conf.FlumeConfiguration: Added sinks: loggerSink Agent: agent
    18/10/24 11:26:35 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
    18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Creating channels
    18/10/24 11:26:35 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory
    18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Created channel memoryChannel
    18/10/24 11:26:35 INFO source.DefaultSourceFactory: Creating instance of source netcatSource, type netcat
    18/10/24 11:26:35 INFO sink.DefaultSinkFactory: Creating instance of sink: loggerSink, type: logger
    18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [netcatSource, loggerSink]
    18/10/24 11:26:35 INFO node.Application: Starting new configuration:{ sourceRunners:{netcatSource=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcatSource,state:IDLE} }} sinkRunners:{loggerSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@262b92ac counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }
    18/10/24 11:26:35 INFO node.Application: Starting Channel memoryChannel
    18/10/24 11:26:35 INFO node.Application: Waiting for channel: memoryChannel to start. Sleeping for 500 ms
    18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.
    18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started
    18/10/24 11:26:36 INFO node.Application: Starting Sink loggerSink
    18/10/24 11:26:36 INFO node.Application: Starting Source netcatSource
    18/10/24 11:26:36 INFO source.NetcatSource: Source starting
    18/10/24 11:26:36 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.16.155.120:11111]
    
    

    然后新开一个终端,发送数据

    (py27) [root@master apache-flume-1.8.0-bin]# telnet localhost 11111
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    1
    OK
    

    查看接收数据

    18/10/24 11:30:15 INFO sink.LoggerSink: Event: { headers:{} body: 31 0D                                           1. }
    

    注:如果没有telnet工具,请先安装:yum install telnet

    • Exec模式
      编写配置文件exec.conf
    agent.sources = netcatSource
    agent.channels = memoryChannel
    agent.sinks = loggerSink
    
    agent.sources.netcatSource.type = exec 
    agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.log
    agent.sources.netcatSource.channels = memoryChannel
    
    agent.sinks.loggerSink.type = logger
    agent.sinks.loggerSink.channel = memoryChannel
    
    agent.channels.memoryChannel.type = memory
    agent.channels.memoryChannel.capacity = 100
    agent.channels.memoryChannel.transactionCapacity = 10
    

    启动实例

    (py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./flume_exec.conf --name agent -Dflume.root.logger=INFO,console
    

    启动成功后,创建配置文件中的exec.log文件

    (py27) [root@master test_data]# ls
    exec.log
    (py27) [root@master test_data]# pwd
    /home/master/FlumeTest/test_data
    (py27) [root@master test_data]#
    

    然后通过echo命令模拟日志的产生

    (py27) [root@master test_data]# echo 'Hello World!!!' >> exec.log
    

    查看接收的日志

    18/10/25 09:19:52 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21 21 21       Hello World!!! }
    

    如何将日志保存到HDFS上
    修改配置文件

    agent.sources = netcatSource
    agent.channels = memoryChannel
    agent.sinks = loggerSink
    
    agent.sources.netcatSource.type = exec 
    agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.log
    agent.sources.netcatSource.channels = memoryChannel
    
    agent.sinks.loggerSink.type = hdfs 
    agent.sinks.loggerSink.hdfs.path = /flume/%y-%m-%d/%H%M/
    agent.sinks.loggerSink.hdfs.filePrefix = exec_hdfs_
    agent.sinks.loggerSink.hdfs.round = true
    agent.sinks.loggerSink.hdfs.roundValue = 1
    agent.sinks.loggerSink.hdfs.roundUnit = minute
    agent.sinks.loggerSink.hdfs.rollInterval = 3
    agent.sinks.loggerSink.hdfs.rollSize = 20
    agent.sinks.loggerSink.hdfs.rollCount = 5
    agent.sinks.loggerSink.hdfs.useLocalTimeStamp = true
    agent.sinks.loggerSink.hdfs.fileType = DataStream
    agent.sinks.loggerSink.channel = memoryChannel
    
    agent.channels.memoryChannel.type = memory
    agent.channels.memoryChannel.capacity = 100
    agent.channels.memoryChannel.transactionCapacity = 10
    

    然后启动实例

    (py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./flume_exec_hdfs.conf --name agent -Dflume.root.logger=INFO,console
    

    然后可以看到它把exec.log文件里的日志给写到了HDFS上

    18/10/25 09:54:26 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
    18/10/25 09:54:26 INFO hdfs.BucketWriter: Creating /flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp
    18/10/25 09:54:32 INFO hdfs.BucketWriter: Closing /flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp
    18/10/25 09:54:32 INFO hdfs.BucketWriter: Renaming /flume/18-10-25/0954/exec_hdfs_.1540475666623.tmp to /flume/18-10-25/0954/exec_hdfs_.1540475666623
    18/10/25 09:54:32 INFO hdfs.HDFSEventSink: Writer callback called.
    

    我们进入HDFS查看,可以看到log里的内容

    (py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25/0954
    Found 1 items
    -rw-r--r--   3 root supergroup         15 2018-10-25 09:54 /flume/18-10-25/0954/exec_hdfs_.1540475666623
    (py27) [root@master sbin]# hadoop fs -text /flume/18-10-25/0954/exec_hdfs_.1540475666623
    Hello World!!!
    

    然后我们再次写入写的log,然后再查看

    //写入新的log
    (py27) [root@master test_data]# echo 'test001' >> exec.log               
    (py27) [root@master test_data]# echo 'test002' >> exec.log
    //进入HDFS目录查看
    (py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25
    Found 2 items
    drwxr-xr-x   - root supergroup          0 2018-10-25 09:54 /flume/18-10-25/0954
    drwxr-xr-x   - root supergroup          0 2018-10-25 09:56 /flume/18-10-25/0956
    (py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25/0956
    Found 1 items
    -rw-r--r--   3 root supergroup         16 2018-10-25 09:56 /flume/18-10-25/0956/exec_hdfs_.1540475766338
    (py27) [root@master sbin]# hadoop fs -text /flume/18-10-25/0956/exec_hdfs_.1540475766338
    test001
    test002
    
    • 故障转移实例
      首先需要三台机器,master、slave1、slave2,然后分别配置实例并启动,master上的agent实例发送日志,slave1和slave2接收日志
      master配置
    agent.sources = netcatSource
    agent.channels = memoryChannel
    agent.sinks = loggerSink1 loggerSink2
    
    agent.sinkgroups = group
    
    agent.sources.netcatSource.type = exec
    agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.log
    agent.sources.netcatSource.channels = memoryChannel
    
    agent.sinks.loggerSink1.type = avro
    agent.sinks.loggerSink1.hostname = slave1
    agent.sinks.loggerSink1.port = 52020
    agent.sinks.loggerSink1.channel = memoryChannel
    
    agent.sinks.loggerSink2.type = avro
    agent.sinks.loggerSink2.hostname = slave2
    agent.sinks.loggerSink2.port = 52020
    agent.sinks.loggerSink2.channel = memoryChannel
    
    agent.channels.memoryChannel.type = memory
    agent.channels.memoryChannel.capacity = 10000
    agent.channels.memoryChannel.transactionCapacity = 1000
    
    agent.sinkgroups.group.sinks = loggerSink1 loggerSink2
    
    
    agent.sinkgroups.group.processor.type = failover
    agent.sinkgroups.group.processor.loggerSink1 = 10
    agent.sinkgroups.group.processor.loggerSink2 = 1
    agent.sinkgroups.group.processor.maxpenalty = 10000
    

    slave1配置

    agent.sources = netcatSource
    agent.channels = memoryChannel
    agent.sinks = loggerSink
    
    agent.sources.netcatSource.type = avro
    agent.sources.netcatSource.bind = slave1
    agent.sources.netcatSource.port = 52020
    agent.sources.netcatSource.channels = memoryChannel
    
    agent.sinks.loggerSink.type = logger
    agent.sinks.loggerSink.channel = memoryChannel
    
    agent.channels.memoryChannel.type = memory
    agent.channels.memoryChannel.capacity = 10000
    agent.channels.memoryChannel.transactionCapacity = 1000
    

    slave2配置

    agent.sources = netcatSource
    agent.channels = memoryChannel
    agent.sinks = loggerSink
    
    agent.sources.netcatSource.type = avro
    agent.sources.netcatSource.bind = slave2
    agent.sources.netcatSource.port = 52020
    agent.sources.netcatSource.channels = memoryChannel
    
    agent.sinks.loggerSink.type = logger
    agent.sinks.loggerSink.channel = memoryChannel
    
    agent.channels.memoryChannel.type = memory
    agent.channels.memoryChannel.capacity = 10000
    agent.channels.memoryChannel.transactionCapacity = 1000
    

    分别启动master、slave1、slave2的agent,然后在mater上写入日志,然后观察谁收到了

    //master
    (py27) [root@master test_data]# echo 'hello' >> exec.log  
    //slave1
    18/10/25 10:53:53 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
    //slave2
    18/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726                             
    

    发现是slave1收到数据,然后我们把slave1的agent关掉,再次发送日志

    //master
    (py27) [root@master test_data]# echo '11111' >> exec.log      
    //slave2
    18/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726
    18/10/25 10:56:53 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 31 31 31                                  11111 }
    
    

    然后再次启动slave1的agent

    //master
    (py27) [root@master test_data]# echo '22222' >> exec.log      
    //slave1
    18/10/25 10:58:21 INFO sink.LoggerSink: Event: { headers:{} body: 32 32 32 32 32                                  22222 }
    //slave2
    18/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726
    18/10/25 10:56:53 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 31 31 31                                  11111 }
    

    欢迎关注公众号

    扫码.png

  • 相关阅读:
    妙用||和&&
    jQuery细节总结
    Mybatis3.2和Spring3.x整合----Myb…
    SSH整合- 2- add service layout
    SSH整合--1
    MyBatis之ResultMap简介,关联对象…
    BeanFactory和ApplicationContext介绍
    Spring IOC容器
    感知机算法的两种表示
    Python中xPath技术和BeautifulSoup的使用
  • 原文地址:https://www.cnblogs.com/KeepInUp/p/10037014.html
Copyright © 2011-2022 走看看