zoukankan      html  css  js  c++  java
  • flume集群日志收集

    一、Flume简介

      Flume是一个分布式的、高可用的海量日志收集、聚合和传输日志收集系统,支持在日志系统中定制各类数据发送方(如:Kafka,HDFS等),便于收集数据。其核心为agent,agent是一个java进程,运行在日志收集节点。

    agent里面包含3个核心组件:source、channel、sink。
       source组件是专用于收集日志的,可以处理各种类型各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义,同时 source组件把数据收集

    以后,临时存放在channel中。

      channel组件是在agent中专用于临时存储数据的,可以存放在memory、jdbc、file、自定义等。channel中的数据只有在sink发送成功之后才会被删除。

      sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。

      在整个数据传输过程中,流动的是event。事务保证是在event级别。flume可以支持多级flume的agent,支持扇入(fan-in)、扇出(fan-out)。

    二、环境准备

      1)hadoop集群(楼主用的版本2.7.3,共6个节点,可参考http://www.cnblogs.com/qq503665965/p/6790580.html

      2)flume集群规划:

    HOST

    作用

    方式

    路径

    hadoop01

    agent

    spooldir

    /home/hadoop/logs

    hadoop05

    collector

    HDFS

    /logs

    hadoop06

    collector

    HDFS

    /logs

      其基本结构官网给出了更加具体的说明,这里楼主就直接copy过来了:

    三、集群配置

      1)系统环境变量配置  

    1 export FLUME_HOME=/home/hadoop/apache-flume-1.7.0-bin
    2 export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$FLUME_HOME/bin

      记得 source /etc/profile

      2)flume JDK环境  

    1 mv flume-env.sh.templete flume-env.sh
    2 vim flume-env.sh
    3 export JAVA_HOME=/usr/jdk1.7.0_60//增加JDK安装路径即可

      3)hadoop01中flume配置

      在conf目录增加配置文件 flume-client ,内容为:  

     1 #agent1名称
     2 agent1.channels = c1
     3 agent1.sources = r1
     4 agent1.sinks = k1 k2
     5 
     6 #sink组名称
     7 agent1.sinkgroups = g1 
     8 
     9 #set channel
    10 agent1.channels.c1.type = memory
    11 agent1.channels.c1.capacity = 1000
    12 agent1.channels.c1.transactionCapacity = 100
    13 
    14 agent1.sources.r1.channels = c1
    15 agent1.sources.r1.type = spooldir
    16 #日志源
    17 agent1.sources.r1.spoolDir =/home/hadoop/logs
    18 
    19 agent1.sources.r1.interceptors = i1 i2
    20 agent1.sources.r1.interceptors.i1.type = static
    21 agent1.sources.r1.interceptors.i1.key = Type
    22 agent1.sources.r1.interceptors.i1.value = LOGIN
    23 agent1.sources.r1.interceptors.i2.type = timestamp
    24 
    25 # 设置sink1
    26 agent1.sinks.k1.channel = c1
    27 agent1.sinks.k1.type = avro
    28 #sink1所在主机
    29 agent1.sinks.k1.hostname = hadoop05
    30 agent1.sinks.k1.port = 52020
    31 
    32 #设置sink2
    33 agent1.sinks.k2.channel = c1
    34 agent1.sinks.k2.type = avro
    35 #sink2所在主机
    36 agent1.sinks.k2.hostname = hadoop06
    37 agent1.sinks.k2.port = 52020
    38 
    39 #设置sink组包含sink1,sink2
    40 agent1.sinkgroups.g1.sinks = k1 k2
    41 
    42 #高可靠性
    43 agent1.sinkgroups.g1.processor.type = failover
    44 #设置优先级
    45 agent1.sinkgroups.g1.processor.priority.k1 = 10
    46 agent1.sinkgroups.g1.processor.priority.k2 = 1
    47 agent1.sinkgroups.g1.processor.maxpenalty = 10000

      4)hadoop05配置

     1 #设置 Agent名称
     2 a1.sources = r1
     3 a1.channels = c1
     4 a1.sinks = k1
     5 
     6 #设置channlels
     7 a1.channels.c1.type = memory
     8 a1.channels.c1.capacity = 1000
     9 a1.channels.c1.transactionCapacity = 100
    10 
    11 # 当前节点信息
    12 a1.sources.r1.type = avro
    13 #绑定主机名
    14 a1.sources.r1.bind = hadoop05
    15 a1.sources.r1.port = 52020
    16 a1.sources.r1.interceptors = i1
    17 a1.sources.r1.interceptors.i1.type = static
    18 a1.sources.r1.interceptors.i1.key = Collector
    19 a1.sources.r1.interceptors.i1.value = hadoop05
    20 a1.sources.r1.channels = c1
    21 
    22 #sink的hdfs地址
    23 a1.sinks.k1.type=hdfs
    24 a1.sinks.k1.hdfs.path=/logs
    25 a1.sinks.k1.hdfs.fileType=DataStream
    26 a1.sinks.k1.hdfs.writeFormat=TEXT
    27 #没1K产生文件
    28 a1.sinks.k1.hdfs.rollInterval=1
    29 a1.sinks.k1.channel=c1
    30 #文件后缀
    31 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

      5)hadoop06配置

     1 #设置 Agent名称
     2 a1.sources = r1
     3 a1.channels = c1
     4 a1.sinks = k1
     5 
     6 #设置channel
     7 a1.channels.c1.type = memory
     8 a1.channels.c1.capacity = 1000
     9 a1.channels.c1.transactionCapacity = 100
    10 
    11 # 当前节点信息
    12 a1.sources.r1.type = avro
    13 #绑定主机名
    14 a1.sources.r1.bind = hadoop06
    15 a1.sources.r1.port = 52020
    16 a1.sources.r1.interceptors = i1
    17 a1.sources.r1.interceptors.i1.type = static
    18 a1.sources.r1.interceptors.i1.key = Collector
    19 a1.sources.r1.interceptors.i1.value = hadoop06
    20 a1.sources.r1.channels = c1
    21 #设置sink的hdfs地址目录
    22 a1.sinks.k1.type=hdfs
    23 a1.sinks.k1.hdfs.path=/logs
    24 a1.sinks.k1.hdfs.fileType=DataStream
    25 a1.sinks.k1.hdfs.writeFormat=TEXT
    26 a1.sinks.k1.hdfs.rollInterval=1
    27 a1.sinks.k1.channel=c1
    28 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

    四、启动flume集群

      1)启动collector,即hadoop05,hadoop06 

    1 flume-ng agent -n a1 -c conf -f flume-server -Dflume.root.logger=DEBUG,console

      2)启动agent,即hadoop01

    flume-ng agent -n a1 -c conf -f flume-client -Dflume.root.logger=DEBUG,console

      agent启动后,hadoop05,hadoop06的控制台可看到如下打印信息:

      

    五、日志收集测试

      1)启动zookeeper集群(未搭建zookeeper的同学可以忽略)

      2)启动hdfs start-dfs.sh 

      3)模拟网站日志,楼主这里随便弄的测试数据

      4)上传到/hadoop/home/logs

      hadoop01输出:

       hadoop05输出:

     

      由于hadoop05设置的优先级高于hadoop06,因此hadoop06无日志写入。

      我们再看hdfs上,是否成功上传了日志文件:

      

    六、高可用性测试

      由于楼主设置的hadoop05的优先级要高于hadoop06,这也是上面的日志收集hadoop05输出而不是hadoop06输出的原因。现在我们干掉优先级高的hadoop05,看hadoop06是否能正常进行日志采集工作。

      

      我们向日志源添加一个测试日志文件:

      

      hadoop06输出:

       查看hdfs:

      

      好了!flume集群配置及日志收集就介绍到这里,下次楼主楼主会具体介绍利用mapreduce对日志进行清洗,并存储到hbase相关的内容。

      

  • 相关阅读:
    信息学奥赛一本通(C++)在线评测系统——基础(一)C++语言—— 1044:判断是否为两位数
    1043:整数大小比较
    1043:整数大小比较
    1043:整数大小比较
    排序算法 —— 插入排序
    排序算法 —— 插入排序
    排序算法 —— 插入排序
    C#中如何获取一个字体的宽度值(像素单位)
    visual studio 恢复默认界面
    visual studio 恢复默认界面
  • 原文地址:https://www.cnblogs.com/qq503665965/p/6853209.html
Copyright © 2011-2022 走看看