zoukankan      html  css  js  c++  java
  • flume简介

    组件介绍:

    代理 Flume Agent

    1 Flume内部有一个或者多个Agent
    2 每一个Agent是一个独立的守护进程(JVM)
    3 从客户端哪儿接收收集,或者从其他的Agent哪儿接收,然后迅速的将获取的数据传到下一个目的节点Agent
    4 Agent主要由source、channel、sink三个组件组成。

    agent source

    1 一个flume源
    2 负责一个外部源(数据生成器),如一个web服务器传递给他的事件
    3 该外部源将它的事件以flume可以识别的格式(event)发送到flume中
    4 当一个flume源接收到一个事件时,其将通过一个或多个通道存储该事件

    agent channel

    1 通道: 采用被动存储的形式,即通道会缓存该事件直到该事件被sink组件处理
    2 所以Channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉它在source和sink间起着一共桥梁的作用,
    channel是一个完整的事务,这一点保证了数据在收发的时候的一致性.并且它可以和任意数量的source和sink链接
    3 可以通过参数设置event的最大个数 4 Flume通常选择FileChannel,而不使用Memory Channel。 Memory Channel: 内存存储事务,吞吐率极高,但存在丢数据风险 File Channel: 本地磁盘的事务实现模式,保证数据不会丢失(WAL实现)

    监控网络端口使用

    netcat
     1 # example.conf: A single-node Flume configuration
     2 
     3 # Name the components on this agent
     4 a1.sources = r1
     5 a1.sinks = k1
     6 a1.channels = c1
     7 
     8 # Describe/configure the source
     9 a1.sources.r1.type = netcat
    10 a1.sources.r1.bind = localhost
    11 a1.sources.r1.port = 44444
    12 
    13 # Describe the sink
    14 a1.sinks.k1.type = logger
    15 
    16 # Use a channel which buffers events in memory
    17 a1.channels.c1.type = memory
    18 a1.channels.c1.capacity = 1000
    19 a1.channels.c1.transactionCapacity = 100
    20 
    21 # Bind the source and sink to the channel
    22 a1.sources.r1.channels = c1
    23 a1.sinks.k1.channel = c1

    启动命令:flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/conf/example.conf -Dflume.root.logger=INFO,console

    监控具体文件使用

    exec
     1 # Name the components on this agent
     2 a1.sources = r1
     3 a1.sinks = k1
     4 a1.channels = c1
     5 
     6 # Describe/configure the source
     7 a1.sources.r1.type = exec
     8 a1.sources.r1.command = tail -F /home/log/data.log
     9 a1.sources.r1.shell = /bin/bash -c
    10 
    11 # Describe the sink
    12 a1.sinks.k1.type = logger
    13 
    14 # Use a channel which buffers events in memory
    15 a1.channels.c1.type = memory
    16 
    17 # Bind the source and sink to the channel
    18 a1.sources.r1.channels = c1
    19 a1.sinks.k1.channel = c1

    启动命令:flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/conf/exec-memory-logger.conf -Dflume.root.logger=INFO,console

    flume监控日志文件并持久化到hdfs

     1 # Name the components on this agent
     2 a1.sources = r1
     3 a1.sinks = k1
     4 a1.channels = c1
     5 
     6 # Describe/configure the source
     7 a1.sources.r1.type = exec
     8 a1.sources.r1.command = tail -F /home/data/data.log
     9 a1.sources.r1.channels = c1
    10 
    11 # Describe the sink
    12 a1.sinks.k1.type = hdfs
    13 a1.sinks.k1.channel = c1
    14 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/
    15 a1.sinks.k1.hdfs.filePrefix = events-
    16 a1.sinks.k1.hdfs.fileType=DataStream
    17 a1.sinks.k1.hdfs.useLocalTimeStamp = true  #必须得写 应为14行用到时间数据
    18 
    19 # Use a channel which buffers events in memory
    20 a1.channels.c1.type = memory
    21 
    22 # Bind the source and sink to the channel
    23 a1.sources.r1.channels = c1
    24 a1.sinks.k1.channel = c1

    别人写的

     1 # Name the components on this agent
     2 a1.sources = r1
     3 a1.sinks = k1
     4 a1.channels = c1
     5 
     6 # Describe/configure the source
     7 ## exec表示flume回去调用给的命令,然后从给的命令的结果中去拿数据
     8 a1.sources.r1.type = exec
     9 ## 使用tail这个命令来读数据
    10 a1.sources.r1.command = tail -F /home/tuzq/software/flumedata/test.log
    11 a1.sources.r1.channels = c1
    12 
    13 # Describe the sink
    14 ## 表示下沉到hdfs,类型决定了下面的参数
    15 a1.sinks.k1.type = hdfs
    16 ## sinks.k1只能连接一个channel,source可以配置多个
    17 a1.sinks.k1.channel = c1
    18 ## 下面的配置告诉用hdfs去写文件的时候写到什么位置,下面的表示不是写死的,而是可以动态的变化的。表示输出的目录名称是可变的
    19 a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/
    20 ##表示最后的文件的前缀
    21 a1.sinks.k1.hdfs.filePrefix = events-
    22 ## 表示到了需要触发的时间时,是否要更新文件夹,true:表示要
    23 a1.sinks.k1.hdfs.round = true
    24 ## 表示每隔1分钟改变一次
    25 a1.sinks.k1.hdfs.roundValue = 1
    26 ## 切换文件的时候的时间单位是分钟
    27 a1.sinks.k1.hdfs.roundUnit = minute
    28 ## 表示只要过了3秒钟,就切换生成一个新的文件
    29 a1.sinks.k1.hdfs.rollInterval = 3
    30 ## 如果记录的文件大于20字节时切换一次
    31 a1.sinks.k1.hdfs.rollSize = 20
    32 ## 当写了5个事件时触发
    33 a1.sinks.k1.hdfs.rollCount = 5
    34 ## 收到了多少条消息往dfs中追加内容
    35 a1.sinks.k1.hdfs.batchSize = 10
    36 ## 使用本地时间戳
    37 a1.sinks.k1.hdfs.useLocalTimeStamp = true
    38 #生成的文件类型,默认是Sequencefile,可用DataStream:为普通文本
    39 a1.sinks.k1.hdfs.fileType = DataStream
    40 
    41 # Use a channel which buffers events in memory
    42 ##使用内存的方式
    43 a1.channels.c1.type = memory
    44 a1.channels.c1.capacity = 1000
    45 a1.channels.c1.transactionCapacity = 100
    46 
    47 # Bind the source and sink to the channel
    48 a1.sources.r1.channels = c1
    49 a1.sinks.k1.channel = c1
  • 相关阅读:
    mysql基础
    EM算法总结
    机器学习之PCA(1)
    C语言socket编程<二>
    计算机网络·实验一:
    机器学习之GMM-EM
    C语言socket编程<一>socket之Winsock API
    【翻译】java-TCP-socket网络编程2
    【翻译】java-TCP-socket网络编程1
    【翻译】Java IO 关系总览和整理
  • 原文地址:https://www.cnblogs.com/luozeng/p/8491149.html
Copyright © 2011-2022 走看看