zoukankan      html  css  js  c++  java
  • Apache Flume 学习笔记

    # 从http://flume.apache.org/download.html 下载flume
    #############################################
    # 概述:Flume 是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的软件。
    # Flume的核心是把数据从数据源(source)收集过来,送到指定的目的地(sink)。为了保证输送的过程一定
    # 成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,再删除自
    # 己缓存的数据。
    #############################################
    # 上传到Linux,
    tar zxvf apache-flume-1.8.0-bin.tar.gz
    rm -rf apache-flume-1.8.0-bin.tar.gz
    mv apache-flume-1.8.0-bin/ flume-1.8.0
    cd flume-1.8.0/conf/
    cp flume-env.sh.template flume-env.sh
    
    vim flume-env.sh
    # 导入正确的JDK路径
    export JAVA_HOME=/usr/local/src/jdk1.8.0_161
    
    
    ########################################
    # 从网络端口接收数据,下沉到logger
    ######################################## 采集配置文件,netcat-logger.conf
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the sources
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sinks
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    ######################################## 采集配置文件 结束
    
    # 启动命令
    bin/flume-ng agent --conf conf/ --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
    # 将出现监听: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
    # 用另一个终端来测试:
    yum install -y telnet
    telnet localhost 44444 # 登录成功会显示 Connected to localhost.  Escape character is '^]'.
    hello, world.  # 发送一段文字。 看启动监听的终端有没有收到。
    # 监听端:2018-05-27 20:33:29,974 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 2C 77 6F 72 6C 64 2E 0D          hello,world.. }
    
    
    
    
    ##########################################
    # 采集目录到HDFS上。# 启动好HDFS,
    ################################## spooldir-hdfs.cnf 文件:
    
    #Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the sources
    # 注意不能往监控目录中重复放置同名文件,一旦重名,服务将出错并停止。
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/logs
    a1.sources.r1.fileHeader = true
    
    # Describe the sinks
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    a1.sinks.k1.hdfs.rollInterval = 3
    a1.sinks.k1.hdfs.rollSize = 20
    a1.sinks.k1.hdfs.rollCount = 5
    a1.sinks.k1.hdfs.batchSize = 1
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    # 生成的文件类型,默认是Sequencefile, 可用DataStream ,则为普通文本
    a1.sinks.k1.hdfs.fileType = DataStream
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    ##################################
    
    # 启动命令 如果/root/logs中已有文件,则会被立刻采集到HDFS 
    bin/flume-ng agent -c conf/ -f conf/spooldir-hdfs.cnf -n a1 -Dflume.root.logger=INFO,console
    # 成功后:2018-05-27 22:08:02,505 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
    
    # 在/root/logs/下创建一个文件,监听端会显示:Writer callback called.
    # HDFS上则得到文件:/flume/events/18-05-27/2210/events-.1527430208616
    # 注意 spooldir 不能往源目录/root/logs/中重复放置同名文件,一旦重名,服务将出错并停止工作。
    
    
    
    
    ##########################################
    ### 增量采集内容变化的文件到HDFS 
    ########################################## tail-hdfs.cnf 文件
    
    #Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the sources
    # 注意不能往监控目录中重复放置同名文件,一旦重名,服务将出错并停止。
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /root/logs/test.log
    a1.sources.r1.channels = c1
    
    # Describe the sinks
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    a1.sinks.k1.hdfs.rollInterval = 3
    a1.sinks.k1.hdfs.rollSize = 20
    a1.sinks.k1.hdfs.rollCount = 5
    a1.sinks.k1.hdfs.batchSize = 1
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    # 生成的文件类型,默认是Sequencefile, 可用DataStream ,则为普通文本
    a1.sinks.k1.hdfs.fileType = DataStream
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    ##########################################
    # 启动命令 如果/root/logs中已有文件,则会被立刻采集到HDFS 
    bin/flume-ng agent -c conf -f conf/tail-hdfs.cnf -n a1 -Dflume.root.logger=INFO,console
    
    # 模拟数据不断写入.
    while true; do date >>/root/logs/test.log;sleep 1.5;done
    
    
    
    
    ########################################## 
    #Load balance 负载均衡
    ##########################################
    # 使用三台机器,设置二级flume, 前面一台采集,使用轮询方式发往后面的二台,后二台再收集前一台发来的数据,下沉到目标。
    scp -r flume-1.8.0/ slave2:/usr/local/src/
    scp -r flume-1.8.0/ slave3:/usr/local/src/
    
    # 使用slave1在最前,slave2 , slave3在其后的方式。
    
    ################# 第一级slave1 配置文件:exec-avro.cnf
    
    #agent1 name
    agent1.channels = c1
    agent1.sources = r1
    agent1.sinks = k1 k2
    
    
    # set group
    agent1.sinkgroups = g1
    
    # set channel
    agent1.channels.c1.type = memory
    agent1.channels.c1.capacity = 1000
    agent1.channels.c1.transactionCapacity = 100
    
    agent1.sources.r1.channels = c1
    agent1.sources.r1.type = exec
    agent1.sources.r1.command = tail -F /root/logs/123.log
    
    # set sink1
    agent1.sinks.k1.channel = c1
    agent1.sinks.k1.type = avro
    agent1.sinks.k1.hostname = slave2
    agent1.sinks.k1.port = 52020
    
    # set sink2
    agent1.sinks.k2.channel = c1
    agent1.sinks.k2.type = avro
    agent1.sinks.k2.hostname = slave3
    agent1.sinks.k2.port = 52020
    
    # set sink group
    agent1.sinkgroups.g1.sinks = k1 k2
    
    # set failover
    agent1.sinkgroups.g1.processor.type = load_balance
    agent1.sinkgroups.g1.processor.backoff = true
    agent1.sinkgroups.g1.processor.selector = round_robin
    agent1.sinkgroups.g1.processor.selector.maxTimeOut = 10000
    
    ############# end ##############
    
    ################# 第二级slave2 配置文件:avro-logger.cnf
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the sources
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = slave2
    a1.sources.r1.port = 52020
    
    # Describe the sinks
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    ############# slave2 end ##############
    
    ################# 第二级slave3 配置文件:avro-logger.cnf 唯一的改变是slave3
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the sources
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = slave3
    a1.sources.r1.port = 52020
    
    # Describe the sinks
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    ############# slave3 end ##############
    
    ## 先启动第二级的slave2, slave3 
    bin/flume-ng agent -c conf -f conf/avro-logger.cnf -n a1 -Dflume.root.logger=INFO,console
    ## 再启动一级的slave1 
    bin/flume-ng agent -c conf -f conf/exec-avro.cnf -n agent1 -Dflume.root.logger=INFO,console
    # 启动成功后,第二级终端会出现类似:CONNECTED: /192.168.112.11:56404 
    # 而后续终止第一级时,第二级会出现类似: /192.168.112.11:56404 disconnected. 
    
    # 模拟数据写入. 会看到仅第二级有采集动作,第一级不作显示。
    while true; do date >>/root/logs/123.log;sleep 1;done
    
    
    ############################################# 
    #  Failover 容错
    #  同一时间后端只有一台机器工作.
    #############################################
    # 还是使用三台机器,设置二级flume, 前面一台采集,发往后面的某一台,优先级最高的收集前一台发来的数据;
    # 如果这台机器挂了,另一台自动替补
    scp -r flume-1.8.0/ slave2:/usr/local/src/
    scp -r flume-1.8.0/ slave3:/usr/local/src/
    
    # 使用slave1在最前,slave2 , slave3在其后的方式。
    
    ################# 第一级slave1 配置文件:exec-avro.cnf
    
    #agent1 name
    agent1.channels = c1
    agent1.sources = r1
    agent1.sinks = k1 k2
    
    
    # set group
    agent1.sinkgroups = g1
    
    # set channel
    agent1.channels.c1.type = memory
    agent1.channels.c1.capacity = 1000
    agent1.channels.c1.transactionCapacity = 100
    
    agent1.sources.r1.channels = c1
    agent1.sources.r1.type = exec
    agent1.sources.r1.command = tail -F /root/logs/456.log
    
    # set sink1
    agent1.sinks.k1.channel = c1
    agent1.sinks.k1.type = avro
    agent1.sinks.k1.hostname = slave2
    agent1.sinks.k1.port = 52020
    
    # set sink2
    agent1.sinks.k2.channel = c1
    agent1.sinks.k2.type = avro
    agent1.sinks.k2.hostname = slave3
    agent1.sinks.k2.port = 52020
    
    # set sink group
    agent1.sinkgroups.g1.sinks = k1 k2
    
    # set failover
    agent1.sinkgroups.g1.processor.type = failover
    agent1.sinkgroups.g1.processor.priority.k1 = 10
    agent1.sinkgroups.g1.processor.priority.k2 = 1
    agent1.sinkgroups.g1.processor.maxpenalty = 10000
    
    ############# end ##############
    
    ################# 第二级slave2 配置文件:avro-logger.cnf
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the sources
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = slave2
    a1.sources.r1.port = 52020
    
    # Describe the sinks
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    ############# slave2 end ##############
    
    ################# 第二级slave3 配置文件:avro-logger.cnf 唯一的改变是slave3
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the sources
    a1.sources.r1.type = avro
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = slave3
    a1.sources.r1.port = 52020
    
    # Describe the sinks
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    ############# slave3 end ##############
    
    ## 先启动第二级的slave3, slave2 
    bin/flume-ng agent -c conf -f conf/avro-logger.cnf -n a1 -Dflume.root.logger=INFO,console
    ## 再启动一级的slave1 
    bin/flume-ng agent -c conf -f conf/exec-avro.cnf -n agent1 -Dflume.root.logger=INFO,console
    # 启动成功后,第二级终端会出现类似:CONNECTED: /192.168.112.11:56404 
    # 而后续终止第一级时,第二级会出现类似: /192.168.112.11:56404 disconnected. 
    
    # 模拟数据写入. 会看到仅第二级slave2有采集动作,第一级不作显示。slave3待命。
    while true; do date >>/root/logs/123.log;sleep 1;done
    # 一旦slave2终止,则slave3自动顶上,继续接收。

    更新一个练习:

    ################################################################ 
    # 案例:
    # A、B两台日志服务器实时生产日志,主要类型为access.log, nginx.log, web.log
    # 要求:把A、B中的三种日志采集汇总到C机器上,然后收集到HDFS
    # 且HDFS中要求按类别存放到不同的目录
    ################################################################
    ### 现将slave1 slave2 slave3 分别对应A B C
    ### A & B 配置文件 exec_source_avro_sink.conf 基本上一样,仅hostname不一样
    
    # Name the components on this agent
    a1.sources = r1 r2 r3 
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /root/logs1/access.log
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    a1.sources.r1.interceptors.i1.key = type
    a1.sources.r1.interceptors.i1.value = access
    
    a1.sources.r2.type = exec
    a1.sources.r2.command = tail -F /root/logs1/nginx.log
    a1.sources.r2.interceptors = i2
    a1.sources.r2.interceptors.i2.type = static
    a1.sources.r2.interceptors.i2.key = type
    a1.sources.r2.interceptors.i2.value = nginx
    
    a1.sources.r3.type = exec
    a1.sources.r3.command = tail -F /root/logs1/web.log
    a1.sources.r3.interceptors = i3
    a1.sources.r3.interceptors.i3.type = static
    a1.sources.r3.interceptors.i3.key = type
    a1.sources.r3.interceptors.i3.value = web
    
    # Describe the sink 发送到下一级主机
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = slave3
    a1.sinks.k1.port = 41414
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 2000000
    a1.channels.c1.transactionCapacity = 100000
    
    # Bind the sourceand sink to the channel
    a1.sources.r1.channels = c1
    a1.sources.r2.channels = c1
    a1.sources.r3.channels = c1
    a1.sinks.k1.channel = c1
    ### end ###
    
    
    ### C 配置文件 avro_source_hdfs_sink.conf
    
    # 定义agent名, source channel sink的名称
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 定义source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = slave3
    a1.sources.r1.port = 41414
    
    # 添加时间拦截器
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
    
    # 定义channels
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 20000
    a1.channels.c1.transactionCapacity = 10000
    
    # 定义sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://master:9000/source/logs/%{type}/%Y%m%d
    a1.sinks.k1.hdfs.filePrefix = events
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.writeFormat = Text
    
    # 时间类型
    # a1.sinks.k1.hdfs.useLocalTimeStamp = true
    # 生成的文件不按条数生成
    a1.sinks.k1.hdfs.rollCount = 0
    # 生成的文件不按时间生成
    a1.sinks.k1.hdfs.rollInterval = 30
    # 生成的文件按大小生成
    a1.sinks.k1.hdfs.rollSize = 10485760
    # 批量写入HDFS的个数
    a1.sinks.k1.hdfs.batchSize = 20
    # flume操作hdfs的线程数(包括新建,写入等)
    a1.sinks.k1.hdfs.threadsPoolSize = 10
    # 操作hdfs超时时间
    a1.sinks.k1.hdfs.callTimeout = 30000
    
    # 组装source channel sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    ### end ###
    
    ## 先启动第二级的slave2 
    bin/flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1 -Dflume.root.logger=DEBUG,console
    ## 再启动一级的slave1 
    bin/flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1 -Dflume.root.logger=DEBUG,console
    # 启动成功后,slave2会出现类似:CONNECTED: /192.168.112.11:56404 
    
    # 模拟数据写入. 
    while true; do echo "access..  `date` " >>/root/logs1/access.log;sleep 1;done
    while true; do echo "nginx..  `date` " >>/root/logs1/nginx.log;sleep 1;done
    while true; do echo "web..  `date` " >>/root/logs1/web.log;sleep 1;done
    
    # 查看hdfs上采集成功。

    今天的练习完成,成功了。

  • 相关阅读:
    CodeForces 734F Anton and School
    CodeForces 733F Drivers Dissatisfaction
    CodeForces 733C Epidemic in Monstropolis
    ZOJ 3498 Javabeans
    ZOJ 3497 Mistwald
    ZOJ 3495 Lego Bricks
    CodeForces 732F Tourist Reform
    CodeForces 732E Sockets
    CodeForces 731E Funny Game
    CodeForces 731D 80-th Level Archeology
  • 原文地址:https://www.cnblogs.com/frx9527/p/flume.html
Copyright © 2011-2022 走看看