zoukankan      html  css  js  c++  java
  • Flume

    1-flume的概述和运行机制

     ==============================================================================================================================================================

    2-flume案例-网络数据采集-Flume安装

     

     ===============================================================================================================================================================

    3-flume案例-网络数据采集-Flume的配置

    netcat-logger.conf
     
    # 定义这个agent中各组件的名字
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # 描述和配置source组件:r1
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 192.168.174.
    a1.sources.r1.port = 44444
    # 描述和配置sink组件:k1
    a1.sinks.k1.type = logger
    # 描述和配置channel组件,此处使用是内存缓存的方式
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # 描述和配置source channel sink之间的连接关系
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1 
    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

     

     

     此时,正在监听这个主机的这个端口有没有信息过来.

    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

     

    ============================================================================================================================================================== 

     5-flume案例-目录数据采集-实现流程

     ==========================================================================================================================================================

    6-flume案例-目录数据采集-配置文件编写

     

    vim spooldir.conf
     
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    ##注意:不能往监控目中重复丢同名文件
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /export/servers/dirfile
    a1.sources.r1.fileHeader = true
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hdfs.path = hdfs://node01:8020/spooldir/files/%y-%m-%d/%H%M/
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    a1.sinks.k1.hdfs.rollInterval = 3
    a1.sinks.k1.hdfs.rollSize = 20
    a1.sinks.k1.hdfs.rollCount = 5
    a1.sinks.k1.hdfs.batchSize = 1
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
    a1.sinks.k1.hdfs.fileType = DataStream
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1 

     

     =========================================================================================================================================

    7-flume案例-目录数据采集-运行测试

    主目录执行命令:  bin/flume-ng agent -c ./conf -f ./conf/spooldir.conf -n a1 -Dflume.root.logger=INFO,console

     

     

     

     =====================================================================================================================================================

    8-flume案例-文件数据采集-步骤分析

     ==============================================================================================================================================================

    9-flume案例-文件数据采集-运行测试

    (1).定义Flume配置文件

    vim tail-file.conf
     
    agent1.sources = source1
    agent1.sinks = sink1
    agent1.channels = channel1
    # Describe/configure tail -F source1
    agent1.sources.source1.type = exec
    agent1.sources.source1.command = tail -F /export/servers/taillogs/access_log
    agent1.sources.source1.channels = channel1
    # Describe sink1
    agent1.sinks.sink1.type = hdfs
    #a1.sinks.k1.channel = c1
    agent1.sinks.sink1.hdfs.path = hdfs://node01:8020/weblog/flume-collection/%y-%m-%d/%H-%M
    agent1.sinks.sink1.hdfs.filePrefix = access_log
    agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
    agent1.sinks.sink1.hdfs.batchSize= 100
    agent1.sinks.sink1.hdfs.fileType = DataStream
    agent1.sinks.sink1.hdfs.writeFormat =Text
    agent1.sinks.sink1.hdfs.round = true
    agent1.sinks.sink1.hdfs.roundValue = 10
    agent1.sinks.sink1.hdfs.roundUnit = minute
    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
    # Use a channel which buffers events in memory
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.keep-alive = 120
    agent1.channels.channel1.capacity = 500000
    agent1.channels.channel1.transactionCapacity = 600
    # Bind the source and sink to the channel
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1

     -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

     (2)Step 2: 启动 Flume

    cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin
    bin/flume-ng agent -c conf -f conf/tail-file.conf -n agent1 -Dflume.root.logger=INFO,console

     ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    (3)

    vim tail-file.sh
     
    #!/bin/bash
    while true
    do
    date >> /export/servers/taillogs/access_log;
    sleep 0.5;
    done 
    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    (4).启动脚本

     

     

     ===================================================================================================================================================

    10-flume案例-flume级联-组件分析

     

    ===========================================================================================================================================================

    11-flume案例-flume级联-配置文件编写

    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    vim tail-avro-avro-logger.conf
     
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /export/servers/taillogs/access_log
    a1.sources.r1.channels = c1
    # Describe the sink
    ##sink端的avro是一个数据发送者
    a1.sinks = k1
    a1.sinks.k1.type = avro
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hostname = 192.168.187.130
    a1.sinks.k1.port = 4141
    a1.sinks.k1.batch-size = 10
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

     ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

     ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    (4)

    vim avro-hdfs.conf
     
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    ##source中的avro组件是一个接收者服务
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = 192.168.187.130
    a1.sources.r1.port = 4141
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://node01:8020/avro/hdfs/%y-%m-%d/%H%M/
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    a1.sinks.k1.hdfs.rollInterval = 3
    a1.sinks.k1.hdfs.rollSize = 20
    a1.sinks.k1.hdfs.rollCount = 5
    a1.sinks.k1.hdfs.batchSize = 1
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
    a1.sinks.k1.hdfs.fileType = DataStream
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1 
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    (1)node03机器启动flflume进程
     
    cd /export/servers/apache-flume-1.8.0-bin
    bin/flume-ng agent -c conf -f conf/avro-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
     
    (2)node02机器启动flflume进程
     
    cd /export/servers/apache-flume-1.8.0-bin/
    bin/flume-ng agent -c conf -f conf/tail-avro-avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
     
    (3)node02机器启shell脚本生成文件
    cd /export/servers/shells
    sh tail-file.sh

     ==================================================================================================================================================================


  • 相关阅读:
    pgspider timescale 扩展docker 镜像
    pgspider perl 语言扩展
    postgresql_anonymizer 方便的数据脱敏扩展
    pgspider jsonb_deep_sum 扩展docker 镜像以及简单试用
    pgspider pgddl 扩展doker 镜像以及试用
    pgspider wal2json doker 镜像
    pgspider Citus节点数据移动操作
    pgspider Citus worker 账户密码问题
    pgspider Citu 副本以及节点移除简单学习
    Citus 官方docker demo 中membership-manager原理简单说明
  • 原文地址:https://www.cnblogs.com/curedfisher/p/12632116.html
Copyright © 2011-2022 走看看