zoukankan      html  css  js  c++  java
  • 采集案例

    采集目录到HDFS

    采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去

    根据需求,首先定义以下3大要素

    1. 采集源,即source——监控文件目录 :  spooldir
    2. 下沉目标,即sink——HDFS文件系统  :  hdfs sink
    3.  sourcesink之间的传递通道——channel,可用file channel 也可以用内存channel

    配置文件编写:

    #定义三大组件的名称
    agent1.sources = source1
    agent1.sinks = sink1
    agent1.channels = channel1
    
    # 配置source组件
    agent1.sources.source1.type = spooldir
    agent1.sources.source1.spoolDir = /home/hadoop/logs/
    agent1.sources.source1.fileHeader = false
    
    #配置拦截器
    agent1.sources.source1.interceptors = i1
    agent1.sources.source1.interceptors.i1.type = host
    agent1.sources.source1.interceptors.i1.hostHeader = hostname
    
    # 配置sink组件
    agent1.sinks.sink1.type = hdfs
    agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
    agent1.sinks.sink1.hdfs.filePrefix = access_log
    agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
    agent1.sinks.sink1.hdfs.batchSize= 100
    agent1.sinks.sink1.hdfs.fileType = DataStream
    agent1.sinks.sink1.hdfs.writeFormat =Text
    agent1.sinks.sink1.hdfs.rollSize = 102400
    agent1.sinks.sink1.hdfs.rollCount = 1000000
    agent1.sinks.sink1.hdfs.rollInterval = 60
    #agent1.sinks.sink1.hdfs.round = true
    #agent1.sinks.sink1.hdfs.roundValue = 10
    #agent1.sinks.sink1.hdfs.roundUnit = minute
    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
    # Use a channel which buffers events in memory
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.keep-alive = 120
    agent1.channels.channel1.capacity = 500000
    agent1.channels.channel1.transactionCapacity = 600
    
    # Bind the source and sink to the channel
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1

    Channel参数解释:

    capacity:默认该通道中最大的可以存储的event数量

    trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量

    keep-aliveevent添加到通道中或者移出的允许时间

    2.采集文件到HDFS

    根据需求,首先定义以下3大要素

    • 采集源,即source——监控文件内容更新 :  exec  ‘tail -F file’
    • 下沉目标,即sink——HDFS文件系统  :  hdfs sink
    • Sourcesink之间的传递通道——channel,可用file channel 也可以用 内存channel

    配置文件编写:.

    agent1.sources = source1
    agent1.sinks = sink1
    agent1.channels = channel1
    
    # Describe/configure tail -F source1
    agent1.sources.source1.type = exec
    agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log
    agent1.sources.source1.channels = channel1
    
    #configure host for source
    agent1.sources.source1.interceptors = i1
    agent1.sources.source1.interceptors.i1.type = host
    agent1.sources.source1.interceptors.i1.hostHeader = hostname
    
    # Describe sink1
    agent1.sinks.sink1.type = hdfs
    #a1.sinks.k1.channel = c1
    agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
    agent1.sinks.sink1.hdfs.filePrefix = access_log
    agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
    agent1.sinks.sink1.hdfs.batchSize= 100
    agent1.sinks.sink1.hdfs.fileType = DataStream
    agent1.sinks.sink1.hdfs.writeFormat =Text
    agent1.sinks.sink1.hdfs.rollSize = 102400
    agent1.sinks.sink1.hdfs.rollCount = 1000000
    agent1.sinks.sink1.hdfs.rollInterval = 60
    agent1.sinks.sink1.hdfs.round = true
    agent1.sinks.sink1.hdfs.roundValue = 10
    agent1.sinks.sink1.hdfs.roundUnit = minute
    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
    
    # Use a channel which buffers events in memory
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.keep-alive = 120
    agent1.channels.channel1.capacity = 500000
    agent1.channels.channel1.transactionCapacity = 600
    
    # Bind the source and sink to the channel
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1

     

    tail-hdfs.conf
    
    用tail命令获取数据,下沉到hdfs
    
    
    mkdir /home/hadoop/log
    
    while true
    do
    echo 111111 >> /home/admin/log/test.log
    sleep 0.5
    done
    
    tail -F test.log
    
    采集到hdfs中, 文件中的目录不用自己建的
    
    检查下hdfs式否是salf模式:
        hdfs dfsadmin -report
    
    bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
    
    前端页面查看下, master:50070, 文件目录: /flum/events/16-04-20/
    
    
    启动命令:
    bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
    ################################################################
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #exec 指的是命令
    # Describe/configure the source
    a1.sources.r1.type = exec
    #F根据文件名追中, f根据文件的nodeid追中
    a1.sources.r1.command = tail -F /home/admin/log/test.log
    a1.sources.r1.channels = c1
    
    # Describe the sink
    #下沉目标
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    #指定目录, flum帮做目的替换
    a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
    #文件的命名, 前缀
    a1.sinks.k1.hdfs.filePrefix = events-
    
    #10 分钟就改目录
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    
    #文件滚动之前的等待时间(秒)
    a1.sinks.k1.hdfs.rollInterval = 3
    
    #文件滚动的大小限制(bytes)
    a1.sinks.k1.hdfs.rollSize = 500
    
    #写入多少个event数据后滚动文件(事件个数)
    a1.sinks.k1.hdfs.rollCount = 20
    
    #5个事件就往里面写入
    a1.sinks.k1.hdfs.batchSize = 5
    
    #用本地时间格式化目录
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    #下沉后, 生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
    a1.sinks.k1.hdfs.fileType = DataStream
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

     

    从端口接收数据

    在第一台机器上启动接收服务:

    从avro端口接收数据,下沉到logger
    bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
    #########
    
    采集配置文件,avro-hdfs.conf
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    #source中的avro组件是接收者服务, 绑定本机
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 4141
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    在第二台机器上发送数据:

    向第二台机器的4141端口发送test.log文件数据

    发送数据:
    $ bin/flume-ng avro-client -H 第一台机器ip -p 4141 -F /home/admin/log/test.log

     

    更多sourcesink组件

    Flume支持众多的sourcesink类型,详细手册可参考官方文档

    http://flume.apache.org/FlumeUserGuide.html

  • 相关阅读:
    【流媒体】Nginx+nginxhttpflvmodule流媒体+鉴权
    【Linux】Tess4j识别pdf报错libgs.so无法加载
    【Java】tess4j文字识别
    【Spring专场】「MVC容器」不看源码就带你认识核心流程以及运作原理
    【Spring专场】「AOP容器」不看源码就带你认识核心流程以及运作原理
    【Spring专场】「IOC容器」不看源码就带你认识核心流程以及运作原理
    【分布式技术专题】「Zookeeper中间件」给大家学习一下Zookeeper的”开发伴侣”—CuratorFramework(基础篇)
    🍃【Spring专题】「技术原理」为大家介绍一下Spring中的Ant路径匹配工具组件AntPathMatcher
    Wireshark安装入门及抓取网站用户名密码
    手把手教你用 wireshark 抓包
  • 原文地址:https://www.cnblogs.com/duan2/p/7608285.html
Copyright © 2011-2022 走看看