zoukankan      html  css  js  c++  java
  • Flume环境安装

     源码包下载:

    http://archive.apache.org/dist/flume/1.8.0/

    集群环境:

    master 192.168.1.99
    slave1 192.168.1.100
    slave2 192.168.1.101

    下载安装包:

    # Master
    wget http://archive.apache.org/dist/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz -C /usr/local/src
    tar -zxvf apache-flume-1.8.0-bin.tar.gz
    mv apache-flume-1.8.0-bin /usr/local/flume

    Flume配置:

    #Netcat

    cd /usr/local/flume/conf

    vim flume-netcat.conf

    # Name the components on this agent
    agent.sources = r1
    agent.sinks = k1
    agent.channels = c1
    
    # Describe/configuration the source
    agent.sources.r1.type = netcat
    agent.sources.r1.bind = master
    agent.sources.r1.port = 44444
    
    #Describe the sink
    agent.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory 
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 1000
    agent.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    agent.sources.r1.channels = c1
    agent.sinks.k1.channel = c1

    验证:

    服务端:
    /usr/local/flume/bin/flume-ng agent -f flume-netcat.conf -n agent -Dflume.root.logger=INFO, console

    客户端:
    telnet master 44444

    结果如图:

     

    #Exec

    cd /usr/local/flume/conf

    vim flume-exec.conf

    # Name the components on this agent
    agent.sources = r1
    agent.sinks = k1
    agent.channels = c1
    
    # Describe/configuration the source
    agent.sources.r1.type = exec
    agent.sources.r1.command = tail -f /root/test.log
    
    #Describe the sink
    agent.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory 
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 1000
    agent.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    agent.sources.r1.channels = c1
    agent.sinks.k1.channel = c1

    验证:

    服务端
    /usr/local/flume/bin/flume-ng agent -f flume-exec.conf -n agent -Dflume.root.logger=INFO, console
    
    客户端
    echo "wangzai" > /root/test.log

    结果如图:

     #HDFS

    cd /usr/local/flume/conf

    vim flume-exec-hdfs.conf

    # Name the components on this agent
    agent.sources = r1
    agent.sinks = k1
    agent.channels = c1

    # Describe/configuration the source
    agent.sources.r1.type = exec
    agent.sources.r1.command = tail -f /root/test.log
    agent.sources.r1.shell = /bin/bash -c

    #Describe the sink
    agent.sinks.k1.type = hdfs
    agent.sinks.k1.hdfs.path = hdfs://master:9000/data/flume/tail
    agent.sinks.k1.hdfs.fileType=DataStream
    agent.sinks.k1.hdfs.writeFormat=Text

    ## hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒,默认为30s
    ## 如果设置成0,则表示不根据时间来滚动文件;
    # agent.sinks.k1.hdfs.rollInterval = 0
    ## 表示到134M的时候回滚到下一个文件
    #agent.sinks.k1.hdfs.rollSize = 134217728
    #agent.sinks.k1.hdfs.rollCount = 1000000
    #agent.sinks.k1.hdfs.batchSize=10

    # Use a channel which buffers events in memory
    agent.channels.c1.type = memory
    #agent.channels.c1.capacity = 1000
    #agent.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    agent.sources.r1.channels = c1
    agent.sinks.k1.channel = c1

    验证:

    服务端
    /usr/local/flume/bin/flume-ng agent -f flume-exec-hdfs.conf -n agent -Dflume.root.logger=INFO, console
    
    客户端
    echo "wangzai" > /root/test.log

    结果如图:

    #Kafka

    cd /usr/local/flume/conf

    vim flume-exec-kafka.conf

    # Name the components on this agent
    agent.sources = r1
    agent.sinks = k1
    agent.channels = c1
    
    # Describe/configuration the source
    agent.sources.r1.type = exec
    agent.sources.r1.command = tail -f /root/test.log
    agent.sources.r1.shell = /bin/bash -c 
    
    ## kafka
    #Describe the sink
    agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.k1.topic = kafkatest
    agent.sinks.k1.brokerList = master:9092
    agent.sinks.k1.requiredAcks = 1
    agent.sinks.k1.batchSize = 2
    
    # Use a channel which buffers events in memory 
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 1000
    #agent.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    agent.sources.r1.channels = c1
    agent.sinks.k1.channel = c1

    验证:

    启动kafka,创建topic

    /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties > /dev/null &
    kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic kafkatest

    启动flume以及测试

    服务端
    /usr/local/flume/bin/flume-ng agent -f flume-exec-kafka.conf -n agent -Dflume.root.logger=INFO, console
    
    客户端
    echo "wangzai" > test.log

    启动kafka客户端

    /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic kafkatest --from-beginning

    结果如图:

    flume服务端:

    kafka客户端:

  • 相关阅读:
    Android总结之json解析(FastJson Gson 对比)
    Android性能优化之UncaughtExceptionHandler定制自己的错误日志系统
    IOS遍历网页获取网页中<img>标签中的图片url
    IOS各种集合遍历效率对比
    cx_oracle访问处理oracle中文乱码问题
    使用tar解压文件提示gzip: stdin: not in gzip format错误
    Mac安装crfpp
    oracle 常用操作
    docker启动centos7后sudo不能使用
    常见Python爬虫工具总结
  • 原文地址:https://www.cnblogs.com/654wangzai321/p/9692595.html
Copyright © 2011-2022 走看看