zoukankan      html  css  js  c++  java
  • Flume简介

    1. Flume概念

    • Flume是Cloudera提供的一个高可用,高可靠,分布式的海量日志采集、聚合和传输的系统
      • Flume支持在日志系统中定制各种数据发送方,用于收集数据
      • Flume提供对数据进行处理,并写到各种不同的数据接收方的能力(可定制).

    2. Flume架构

    • Flume的核心是把数据从数据源收集过来再送到目的地,为了保证输送的成功,在送到目的地前flume会缓存数据,等到数据传输完成后,再删掉自己缓存的数据
    • Flume的核心角色是agent,flume采集系统就是由一个个agent所链接起来的
    • flume的agent有三个组件组成
      • source
        • 采集组件,数据的收集端,负责跟数据源对接,以获取数据,source会在将数据捕获后,对数据进行格式化,将数据封装到事件(Event)中,然后将事件推入到channel中
        • source 主要分以下几种
          • Avro Source
            • Listens on Avro port and receives events from external Avro client streams.
          • Thrift Source
            • Listens on Thrift port and receives events from external Thrift client streams.
          • Exec Source
            • Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out (stderr is simply discarded, unless property logStdErr is set to true)
          • Spooling Directory Source
            - This source lets you ingest data by placing files to be ingested into a “spooling” directory on disk
          • 除了以上几种外还有JMS Source 、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source,etc。而且Flume还允许自定义source
      • channel
        • 传输通道,可以缓存数据,用于从source将数据传输给sink
        • Flume对于Channel,则提供了Memory Channel、JDBC Chanel、File Channel,etc。
      • sink
        • 下沉组件,取出channel的数据,进行相应的存储文件系统、数据库、提交到远程的服务器或者是下一级的agent中
        • Flume也提供了很多sink的实现,包括HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sink,etc。

    3. Flume 采集系统

    • 简单结构
      • 单个agent采集数据
    • 复杂结构
      • 两个agent互联
      • 多个agent互联
      • 多个channel

    4. Flume的安装

    Flume的安装比较简单

    1. 下载安装包
      官网地址http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.14.2.tar.gz
    2. 规划安装目录
      /bigdata/install
    3. 解压安装包到指定的规划目录,并重命名文件夹
    	tar -zxvf flume-ng-1.6.0-cdh5.14.2.tar.gz -C /bigdata/install
    	mv apache-flume-1.6.0-cdh5.14.2-bin  flume-1.6.0-cdh5.14.2
    
    1. 修改配置文件
      • 将 /bigdata/install/flume-1.6.0-cdh5.14.2/conf/里面的 flume-env.sh.template 重命名为 flume-env.sh
      • 修改flume-env.sh
    export JAVA_HOME=${your java home}
    

    5. 实例

    5.1 采集文件到控制台

    • 需求
      监控某个文件如果又新增内容就把数据采集到控制台显示

    • 修改配置文件
      在flume的安装目录新建一个myconf文件夹用于存放flume的开发配置文件
      vim tail-log.conf

      	#定义一个agent,分别指定source、channel、sink别名
      	a1.sources = r1
      	a1.channels = c1
      	a1.sinks = k1
      	
      	#定义source
      	#指定source的类型为exec,即通过Linux命令来传输数据
      	a1.sources.r1.type = exec
      	#监控一个文件,如果有新的数据产生就采集走
      	a1.sources.r1.command = tail -F /bigdata/data/flume/tail.log
      	#指定source的数据流入的channel中
      	a1.sources.r1.channels = c1
      	
      	#配置channel
      	#指定channel的类型为memory
      	a1.channels.c1.type = memory
      	#指定channel 最多可以存放的容量
      	a1.channels.c1.capacity = 1000
      	#指定事务中source写数据到channel或者sink从channel中取数据的最大的条数
      	a1.channels.c1.transactionCapacity = 100
      	
      	#配置sink
      	#指定sink从哪个channel中获取数据
      	a1.sinks.k1.channel = c1
      	#指定sink的类型为日志格式。结果会打印在控制台中
      	a1.sinks.k1.type = logger
      
    • 启动agent
      进入到node01的/bigdata/install/flume-1.6.0-cdh5.14.2目录执行

      bin/flume-ng agent -n a1 -c myconf -f myconf/tail-log.conf -Dflume.root.logger=info.console  
      
      
      -n指定该agent的名称
      -c表示配置文件所在的目录
      -f指定配置文件
      -D表示指定key=value键值对---这里指定的是启动的日志输出级别
      

    5.2 采集文件到HDFS

    • 需求描述

      监控一个文件如果有新增的内容就把数据采集到HDFS上

    • Flume配置文件

      vim file2Hdfs.conf

      #定义一个agent,分别指定source、channel、sink别名
      a1.sources = r1
      a1.channels = c1
      a1.sinks = k1
      
      #定义source
      #指定source的类型为exec,即通过Linux命令来传输数据
      a1.sources.r1.type = exec
      #监控一个文件,如果有新的数据产生就采集走
      a1.sources.r1.command = tail -F /bigdata/data/flume/tail.log
      #指定source的数据流入的channel中
      a1.sources.r1.channels = c1
      
      #配置channel
      #指定channel的类型为file
      a1.channels.c1.type = file
      a1.channels.c1.checkpointDir = /bigdata/data/flume/flume_checkpoint
      #数据存储所在的目录
      a1.channels.c1.dataDirs = /bigdata/data/flume/flume_data
      
      #配置sink
      a1.sinks.k1.channel = c1
      #指定sink类型为hdfs
      a1.sinks.k1.type = hdfs
      #指定数据收集到hdfs的目录
      a1.sinks.k1.hdfs.path = hdfs://node01:8020/tailFile/%Y-%m-%d/%H%M
      #指定生成文件名的前缀
      a1.sinks.k1.hdfs.filePrefix = events-
      
      #开启时间舍弃
      a1.sinks.k1.hdfs.round = true
      #设置时间舍弃的间隔,即文件的间隔时间为10
      a1.sinks.k1.hdfs.roundValue = 10
      #设置时间舍弃的单位
      a1.sinks.k1.hdfs.roundUnit = minute
      
      # 控制文件个数,滚动生成文件
      #60s或者50字节或者10条数据,谁先满足,就开始滚动生成新文件
      #这个单位太小,仅能在测试的时候使用,并且不要开启太长时间
      a1.sinks.k1.hdfs.rollInterval = 60
      a1.sinks.k1.hdfs.rollSize = 50
      a1.sinks.k1.hdfs.rollCount = 10
      
      #定义每次写入的数据量
      a1.sinks.k1.hdfs.batchSize = 100
      #开始本地时间戳--开启后就可以使用%Y-%m-%d去解析时间
      a1.sinks.k1.hdfs.useLocalTimeStamp = true
      
      #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
      a1.sinks.k1.hdfs.fileType = DataStream
      
    • 启动agent

      bin/flume-ng agent -n a1 -c myconf -f myconf/file2Hdfs.conf -Dflume.root.logger=info.console
      

      可以在集群的hdfs的文件系统中 hdfs://node01:8020/tailFile/%Y-%m-%d/%H%M 看到数据的流入

    5.2 采集目录到HDFS

    • 需求描述

      一个目录中不断有新的文件产生,需要把目录中的文件不断地进行数据收集保存到HDFS上

    • Flume配置文件

      vim dir2Hdfs.conf

      # Name the components on this agent
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # 配置source
      ##注意:不能往监控目中重复丢同名文件
      #flume会把传输完的文件重名成*.COMPLETED
      a1.sources.r1.type = spooldir
      a1.sources.r1.spoolDir = /bigdata/data/flume/files
      # 是否将文件的绝对路径添加到header
      a1.sources.r1.fileHeader = true
      a1.sources.r1.channels = c1
      
      
      #配置channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      
      #配置sink
      a1.sinks.k1.type = hdfs
      a1.sinks.k1.channel = c1
      a1.sinks.k1.hdfs.path = hdfs://node01:8020/spooldir/%Y-%m-%d/%H%M
      a1.sinks.k1.hdfs.filePrefix = events-
      a1.sinks.k1.hdfs.round = true
      a1.sinks.k1.hdfs.roundValue = 10
      a1.sinks.k1.hdfs.roundUnit = minute
      a1.sinks.k1.hdfs.rollInterval = 60
      a1.sinks.k1.hdfs.rollSize = 50
      a1.sinks.k1.hdfs.rollCount = 10
      a1.sinks.k1.hdfs.batchSize = 100
      a1.sinks.k1.hdfs.useLocalTimeStamp = true
      #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
      a1.sinks.k1.hdfs.fileType = DataStream
      
    • 启动agent

      bin/flume-ng agent -n a1 -c myconf -f myconf/dir2Hdfs.conf -Dflume.root.logger=info.console
      

      可以在集群的hdfs的文件系统中 hdfs://node01:8020/tailFile/%Y-%m-%d/%H%M 看到数据的流入

      如果源目录放入了同名的文件回报如下错误

      java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /bigdata/data/flume/files/a.txt.COMPLETED
      	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:463)
      	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:414)
      	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:326)
      	at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      
  • 相关阅读:
    贝叶斯推断祭
    libstdc和glibc的一些共享库问题
    nf_conntrack之解决方案
    Too many open files 问题
    Centos系统 上下文切换的检查思路
    GPS坐标转大地坐标
    【转】关于IAP与APP互相跳转的实现
    stm32定时器计数功能
    C库函数——字符串转数字整理
    【转】sscanf函数用法实例
  • 原文地址:https://www.cnblogs.com/William364248886/p/11915304.html
Copyright © 2011-2022 走看看