zoukankan      html  css  js  c++  java
  • 大数据篇:Flume

    大数据篇:Flume

    flume.apache.org

    Flume是什么?

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

    如果没有Flume

    数据的采集发送怎么处理呢?处理到哪里呢?Flume最主要的作用就是实时读取服务器本地磁盘数据,写入Hdfs或Kafka等中间件。

    1 基础架构

    • Agent主要由:source、channel、sink三个组件组成.

    • Source:

      • 从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channel,Flume提供多种数据接收的方式,比如Avro(Flume对接Flume),Exec(命令行如tail -f),Taildir(目录本地文件),Kafka等。
    • Channel:

      • channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着桥梁的作用,channel是一个完整的事务,这一点保证了数据在收发的时候的一致性,并且它可以和任意数量的source和sink链接,支持的类型有: JDBC , File System,Memory等。
    • sink:

      • sink将数据channels消费数据(events)并将其传递给目标地,目标地可能是另一个sink,Flume提供多种数据发送的方式,比如Avro,HDFS,Hive,Kafka。
    • Event

      • Flume以事件的形式将数据从源头传送到最终的目的
      • Event是数据传输的基本单元
      • Event由Header和Body两部分组成,Header用来存放该Event的一些属性(K-V结构),Body存放数据(Byte Array结构)。

    2 案例演示

    2.1 netcat->Memory->Logger

    1. 通过netcat工具向本机44444端口发送数据
    2. Flume监控本机44444端口读取数据
    3. Flume将获取数据打印到控制台
    • 安装netcat工具
    yum -y install nc
    #监听44444端口(服务端)
    nc -lk 44444
    #监听44444端口(客户端)
    nc localhost 44444
    #互相发送数据接收即可
    
    • 创建Agent配置文件flume-netcat-logger.conf
    vim flume-netcat-logger.conf
    #--->
    # 给agent组件命名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # sources相关配置
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # sinks相关配置
    a1.sinks.k1.type = logger
    
    # channels相关配置
    a1.channels.c1.type = memory
    #事件容量
    a1.channels.c1.capacity = 1000
    #一次传输多少事件
    a1.channels.c1.transactionCapacity = 100
    
    # 绑定三个组件
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    #---<
    
    • 启动flume
    #普通写法
    flume-ng agent --conf /etc/flume-ng/conf --conf-file flume-netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
    #简写
    flume-ng agent -c /etc/flume-ng/conf -f flume-netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
    

    2.2 .log本地文件->Memory->Hdfs

    1. 生成本地日志文件
    2. Flume获取本地数据文件
    3. Flume将获取的文件发送到Hdfs
    • 创建Agent配置文件flume-log-hdfs.conf
    vim flume-log-hdfs.conf
    #--->
    # 给agent组件命名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # sources相关配置
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /root/flume-test/logs/a.log
    
    # sinks相关配置
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://cdh01.cm:8020/flume/events/%Y-%m-%d/%H-%M
    a1.sinks.k1.hdfs.filePrefix = events-
    #文件夹滚动一分钟创建一个新文件夹
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 1
    a1.sinks.k1.hdfs.roundUnit = minute
    #文件滚动时间10S 128M 2条 生成新文件
    a1.sinks.k1.hdfs.rollInterval = 10	
    a1.sinks.k1.hdfs.rollSize = 134210000	
    a1.sinks.k1.hdfs.rollCount = 2
    #积累多少Event才刷到hdfs
    a1.sinks.k1.hdfs.batchSize = 2
    #开启时间滚动需要
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #DataStream不会压缩输出文件
    a1.sinks.k1.hdfs.fileType = DataStream 
    
    # channels相关配置
    a1.channels.c1.type = memory
    #事件容量
    a1.channels.c1.capacity = 1000
    #一次传输多少事件
    a1.channels.c1.transactionCapacity = 100
    
    # 绑定三个组件
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    #---<
    
    • 启动flume
    flume-ng agent -c /etc/flume-ng/conf -f flume-log-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
    
    • 创建本地文件
    mkdir /root/flume-test/logs
    echo "1" > /root/flume-test/logs/a.log
    echo "2" >> /root/flume-test/logs/a.log
    echo "3" >> /root/flume-test/logs/a.log
    echo "4" >> /root/flume-test/logs/a.log
    #根据上面设置的间隔时间进行效果测试
    echo "5" >> /root/flume-test/logs/a.log
    echo "6" >> /root/flume-test/logs/a.log
    echo "7" >> /root/flume-test/logs/a.log
    echo "8" >> /root/flume-test/logs/a.log
    
    

    2.3 本地文件夹->Memory->Hdfs

    1. 生成本地文件夹及文件数据
    2. Flume获取本地数据文件
    3. Flume将获取的文件发送到Hdfs
    • 创建Agent配置文件flume-file-hdfs.conf
    vim flume-file-hdfs.conf
    #--->
    # 给agent组件命名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # sources相关配置
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/flume-test/dirlogs
    #忽略文件
    a1.sources.r1.ignorePattern = ([^ ]*.txt)
    
    # sinks相关配置
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://cdh01.cm:8020/flume/dirlogs/%Y-%m-%d/%H-%M
    a1.sinks.k1.hdfs.filePrefix = log-
    #文件夹滚动一分钟创建一个新文件夹
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 1
    a1.sinks.k1.hdfs.roundUnit = minute
    #文件滚动时间10S 128M 2条 生成新文件
    a1.sinks.k1.hdfs.rollInterval = 10	
    a1.sinks.k1.hdfs.rollSize = 134210000	
    a1.sinks.k1.hdfs.rollCount = 2
    #积累多少Event才刷到hdfs
    a1.sinks.k1.hdfs.batchSize = 2
    #开启时间滚动需要
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #DataStream不会压缩输出文件
    a1.sinks.k1.hdfs.fileType = DataStream 
    
    # channels相关配置
    a1.channels.c1.type = memory
    #事件容量
    a1.channels.c1.capacity = 1000
    #一次传输多少事件
    a1.channels.c1.transactionCapacity = 100
    
    # 绑定三个组件
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    #---<
    
    • 启动flume
    flume-ng agent -c /etc/flume-ng/conf -f flume-file-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
    
    • 创建本地文件
    mkdir /root/flume-test/dirlogs
    echo "1" > /root/flume-test/dirlogs/a.log
    echo "2" >> /root/flume-test/dirlogs/a.log
    echo "3" > /root/flume-test/dirlogs/a.txt
    echo "4" >> /root/flume-test/dirlogs/a.txt
    #根据上面设置的间隔时间进行效果测试
    echo "5" > /root/flume-test/dirlogs/b.log
    echo "6" >> /root/flume-test/dirlogs/b.log
    #采用cp直接放入一个写好的文件测试效果
    

    不能在监控目录中创建并持续修改文件

    上传完成的文件以.COMPLETED结尾

    被监控文件夹500毫秒扫描一次文件变动

    2.4 本地文件夹->Memory->Logger

    监控目录下的实时追加文件

    1. 生成本地文件夹及文件数据
    2. Flume获取本地数据文件
    3. Flume将获取数据打印到控制台
    • 创建Agent配置文件flume-files-logger.conf
    vim flume-files-logger.conf
    #--->
    # 给agent组件命名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # sources相关配置
    a1.sources.r1.type = TAILDIR
    #位置信息
    a1.sources.r1.positionFile = /root/flume-test/taildir_position.json
    a1.sources.r1.filegroups = f1 f2
    a1.sources.r1.filegroups.f1 = /root/flume-test/test1/a.log
    a1.sources.r1.headers.f1.headerKey1 = value1
    a1.sources.r1.filegroups.f2 = /root/flume-test/test2/.*log.*
    a1.sources.r1.headers.f2.headerKey1 = value2
    a1.sources.r1.headers.f2.headerKey2 = value2-2
    a1.sources.r1.fileHeader = true
    a1.sources.ri.maxBatchCount = 1000
    
    # sinks相关配置
    a1.sinks.k1.type = logger
    
    # channels相关配置
    a1.channels.c1.type = memory
    #事件容量
    a1.channels.c1.capacity = 1000
    #一次传输多少事件
    a1.channels.c1.transactionCapacity = 100
    
    # 绑定三个组件
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    #---<
    
    • 启动flume
    flume-ng agent -c /etc/flume-ng/conf -f flume-files-logger.conf -n a1 -Dflume.root.logger=INFO,console
    
    • 创建本地文件
    mkdir /root/flume-test/test1/
    mkdir /root/flume-test/test2/
    echo "1" > /root/flume-test/test1/a.log
    echo "2" >> /root/flume-test/test1/a.log
    echo "3" >> /root/flume-test/test1/a.log
    #根据上面设置的间隔时间进行效果测试
    echo "5" > /root/flume-test/test2/b.log
    echo "6" >> /root/flume-test/test2/b.log
    echo "7" >> /root/flume-test/test2/b.log
    #停止flume,追加数据,在启动测试断点续传效果。
    

    2.5 netcat->Memory->kafka

    1. 生成本地文件夹及文件数据
    2. Flume获取本地数据文件
    3. Flume将获取数据打印到控制台
    • 创建Agent配置文件flume-files-kafka.conf
    vim flume-file-kafka.conf
    #--->
    # 给agent组件命名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # sources相关配置
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # sinks相关配置
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = top-test
    a1.sinks.k1.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    
    
    # channels相关配置
    a1.channels.c1.type = memory
    #事件容量
    a1.channels.c1.capacity = 1000
    #一次传输多少事件
    a1.channels.c1.transactionCapacity = 100
    
    # 绑定三个组件
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    #---<
    
    • 启动flume
    flume-ng agent -c /etc/flume-ng/conf -f flume-file-kafka.conf -n a1 -Dflume.root.logger=INFO,console
    
    • 启动kafka消费者
    kafka-console-consumer --topic top-test --bootstrap-server cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092  --from-beginning --group g1
    
    • 使用netcat
    nc localhost 44444
    


  • 相关阅读:
    001 课程定位和目标
    003 Python基本语法元素
    Oracle之用户和表空间
    基于SecureCRT的测试环境的克隆的linux/vi相关命令
    今日总结(linux和plsql)
    String小案例(**)、包装类型和普通数据类型的转换(拆装箱)
    Java基础再复习(继承、多态、方法内部类**、HashMap用法**、参数传递**)
    Servlet向JSP过渡
    控制层和ajax用法的详解
    注册页面的JSON响应方式详细分析(与前端页面交互方式之一)
  • 原文地址:https://www.cnblogs.com/ttzzyy/p/12638360.html
Copyright © 2011-2022 走看看